From 7904fb03a498a6ac515feeae06bbaaea4b09a845 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 20:01:43 +0200 Subject: [PATCH] stream: duplexify --- lib/internal/streams/compose.js | 119 +--------- lib/internal/streams/duplexify.js | 286 ++++++++++++++++++++++++ lib/internal/streams/pipeline.js | 8 +- lib/internal/streams/utils.js | 10 + lib/stream.js | 4 + test/parallel/test-bootstrap-modules.js | 1 + 6 files changed, 309 insertions(+), 119 deletions(-) create mode 100644 lib/internal/streams/duplexify.js diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index bdfdc4cebe0c47..912f9d7793eb9b 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -2,50 +2,20 @@ const pipeline = require('internal/streams/pipeline'); const Duplex = require('internal/streams/duplex'); -const { createDeferredPromise } = require('internal/util'); const { destroyer } = require('internal/streams/destroy'); -const from = require('internal/streams/from'); const { isNodeStream, - isIterable, isReadable, isWritable, } = require('internal/streams/utils'); -const { - PromiseResolve, -} = primordials; +const duplexify = require('internal/streams/duplexify'); const { AbortError, codes: { - ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_VALUE, - ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, }, } = require('internal/errors'); -const assert = require('internal/assert'); - -// This is needed for pre node 17. -class ComposeDuplex extends Duplex { - constructor(options) { - super(options); - - // https://github.com/nodejs/node/pull/34385 - - if (options?.readable === false) { - this._readableState.readable = false; - this._readableState.ended = true; - this._readableState.endEmitted = true; - } - - if (options?.writable === false) { - this._writableState.writable = false; - this._writableState.ending = true; - this._writableState.ended = true; - this._writableState.finished = true; - } - } -} module.exports = function compose(...streams) { if (streams.length === 0) { @@ -53,18 +23,18 @@ module.exports = function compose(...streams) { } if (streams.length === 1) { - return makeDuplex(streams[0], 'streams[0]'); + return duplexify(streams[0], 'streams[0]'); } const orgStreams = [...streams]; if (typeof streams[0] === 'function') { - streams[0] = makeDuplex(streams[0], 'streams[0]'); + streams[0] = duplexify(streams[0], 'streams[0]'); } if (typeof streams[streams.length - 1] === 'function') { const idx = streams.length - 1; - streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`); + streams[idx] = duplexify(streams[idx], `streams[${idx}]`); } for (let n = 0; n < streams.length; ++n) { @@ -116,7 +86,7 @@ module.exports = function compose(...streams) { // TODO(ronag): Avoid double buffering. // Implement Writable/Readable/Duplex traits. // See, https://github.com/nodejs/node/pull/33515. - d = new ComposeDuplex({ + d = new Duplex({ highWaterMark: 1, writableObjectMode: !!head?.writableObjectMode, readableObjectMode: !!tail?.writableObjectMode, @@ -203,82 +173,3 @@ module.exports = function compose(...streams) { return d; }; - -function makeDuplex(stream, name) { - let ret; - if (typeof stream === 'function') { - assert(stream.length > 0); - - const { value, write, final } = fromAsyncGen(stream); - - if (isIterable(value)) { - ret = from(ComposeDuplex, value, { - objectMode: true, - highWaterMark: 1, - write, - final - }); - } else if (typeof value?.then === 'function') { - const promise = PromiseResolve(value) - .then((val) => { - if (val != null) { - throw new ERR_INVALID_RETURN_VALUE('nully', name, val); - } - }) - .catch((err) => { - destroyer(ret, err); - }); - - ret = new ComposeDuplex({ - objectMode: true, - highWaterMark: 1, - readable: false, - write, - final(cb) { - final(() => promise.then(cb, cb)); - } - }); - } else { - throw new ERR_INVALID_RETURN_VALUE( - 'Iterable, AsyncIterable or AsyncFunction', name, value); - } - } else if (isNodeStream(stream)) { - ret = stream; - } else if (isIterable(stream)) { - ret = from(ComposeDuplex, stream, { - objectMode: true, - highWaterMark: 1, - writable: false - }); - } else { - throw new ERR_INVALID_ARG_TYPE( - name, - ['Stream', 'Iterable', 'AsyncIterable', 'Function'], - stream) - ; - } - return ret; -} - -function fromAsyncGen(fn) { - let { promise, resolve } = createDeferredPromise(); - const value = fn(async function*() { - while (true) { - const { chunk, done, cb } = await promise; - process.nextTick(cb); - if (done) return; - yield chunk; - ({ promise, resolve } = createDeferredPromise()); - } - }()); - - return { - value, - write(chunk, encoding, cb) { - resolve({ chunk, done: false, cb }); - }, - final(cb) { - resolve({ done: true, cb }); - } - }; -} diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js new file mode 100644 index 00000000000000..72d75b4773430a --- /dev/null +++ b/lib/internal/streams/duplexify.js @@ -0,0 +1,286 @@ +'use strict'; + +const { + isReadable, + isWritable, + isIterable, + isNodeStream, + isReadableNodeStream, + isWritableNodeStream, + isDuplexNodeStream, +} = require('internal/streams/utils'); +const eos = require('internal/streams/end-of-stream'); +const { + AbortError, + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_RETURN_VALUE, + }, +} = require('internal/errors'); +const { destroyer } = require('internal/streams/destroy'); +const Duplex = require('internal/streams/duplex'); +const Readable = require('internal/streams/readable'); +const Writable = require('internal/streams/writable'); +const { createDeferredPromise } = require('internal/util'); +const from = require('internal/streams/from'); + +const { + isBlob, +} = require('internal/blob'); + +const { + isBrandCheck, +} = require('internal/webstreams/util'); + +const isReadableStream = + isBrandCheck('ReadableStream'); +const isWritableStream = + isBrandCheck('WritableStream'); + +const { + PromiseResolve +} = primordials; + +module.exports = function duplexify(body, name) { + if (isDuplexNodeStream(body)) { + return body; + } else if (isReadableNodeStream(body)) { + return _duplexify({ readable: body }); + } else if (isWritableNodeStream(body)) { + return _duplexify({ writable: body }); + } else if (isNodeStream(body)) { + return _duplexify({ writable: false, readable: false }); + } else if (isReadableStream(body)) { + return _duplexify({ readable: Readable.fromWeb(body) }); + } else if (isWritableStream(body)) { + return _duplexify({ writable: Writable.fromWeb(body) }); + } else if (typeof body === 'function') { + const { value, write, final } = fromAsyncGen(body); + + if (isIterable(value)) { + return from(Duplex, value, { + objectMode: true, + highWaterMark: 1, + write, + final + }); + } + + if (typeof value?.then === 'function') { + let d; + + const promise = PromiseResolve(value) + .then((val) => { + if (val != null) { + throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val); + } + }) + .catch((err) => { + destroyer(d, err); + }); + + return d = new Duplex({ + objectMode: true, + highWaterMark: 1, + readable: false, + write, + final(cb) { + final(() => promise.then(cb, cb)); + } + }); + } + + throw new ERR_INVALID_RETURN_VALUE( + 'Iterable, AsyncIterable or AsyncFunction', name, value); + } else if (isBlob(body)) { + return duplexify(async function* () { + yield * await body.arrayBuffer(); + }()); + } else if (isIterable(body)) { + return from(Duplex, body, { + objectMode: true, + highWaterMark: 1, + writable: false + }); + } else if ( + isReadableStream(body?.readable) && + isWritableStream(body?.writable) + ) { + return Duplex.fromWeb(body); + } else if ( + typeof body?.writable === 'object' || + typeof body?.readable === 'object' + ) { + const readable = body?.readable ? + isReadableNodeStream(body?.readable) ? body?.readable : + duplexify(body.readable, body.options ?? body.readableOptions) : + undefined; + + const writable = body?.writable ? + isWritableNodeStream(body?.writable) ? body?.writable : + duplexify(body.writable, body.options ?? body.writableOptions) : + undefined; + + return _duplexify({ readable, writable }); + } + + throw new ERR_INVALID_ARG_TYPE( + name, + ['Blob', 'ReadableStream', 'WritableStream', 'Stream', 'Iterable', + 'AsyncIterable', 'Function', '{ readable, writable } pair'], + body); +}; + +function fromAsyncGen(fn) { + let { promise, resolve } = createDeferredPromise(); + const value = fn(async function*() { + while (true) { + const { chunk, done, cb } = await promise; + process.nextTick(cb); + if (done) return; + yield chunk; + ({ promise, resolve } = createDeferredPromise()); + } + }()); + + return { + value, + write(chunk, encoding, cb) { + resolve({ chunk, done: false, cb }); + }, + final(cb) { + resolve({ done: true, cb }); + } + }; +} + +function _duplexify(pair) { + const r = pair.readable; + const w = pair.writable; + + let readable = !!isReadable(r); + let writable = !!isWritable(w); + + let ondrain; + let onfinish; + let onreadable; + let onclose; + let d; + + function onfinished(err) { + const cb = onclose; + onclose = null; + + if (cb) { + cb(err); + } else if (err) { + d.destroy(err); + } else if (!readable && !writable) { + d.destroy(); + } + } + + eos(r, (err) => { + readable = false; + if (err) { + destroyer(w, err); + } + onfinished(err); + }); + + eos(w, (err) => { + writable = false; + if (err) { + destroyer(r, err); + } + onfinished(err); + }); + + d = new Duplex({ + // TODO (ronag): highWaterMark? + readableObjectMode: !!r?.readableObjectMode, + writableObjectMode: !!w?.writableObjectMode, + readable, + writable, + }); + + if (writable) { + d._write = function(chunk, encoding, callback) { + if (w.write(chunk, encoding)) { + callback(); + } else { + ondrain = callback; + } + }; + + d._final = function(callback) { + w.end(); + onfinish = callback; + }; + + w.on('drain', function() { + if (ondrain) { + const cb = ondrain; + ondrain = null; + cb(); + } + }); + + w.on('finish', function() { + if (onfinish) { + const cb = onfinish; + onfinish = null; + cb(); + } + }); + } + + if (readable) { + r.on('readable', function() { + if (onreadable) { + const cb = onreadable; + onreadable = null; + cb(); + } + }); + + r.on('end', function() { + d.push(null); + }); + + d._read = function() { + while (true) { + const buf = r.read(); + + if (buf === null) { + onreadable = d._read; + return; + } + + if (!d.push(buf)) { + return; + } + } + }; + } + + d._destroy = function(err, callback) { + if (!err && onclose !== null) { + err = new AbortError(); + } + + onreadable = null; + ondrain = null; + onfinish = null; + + if (onclose === null) { + callback(err); + } else { + onclose = callback; + destroyer(w, err); + destroyer(r, err); + } + }; + + return d; +} diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 1b56c08b9e6958..7318cd617478b1 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -10,6 +10,7 @@ const { } = primordials; const eos = require('internal/streams/end-of-stream'); +const duplexify = require('internal/streams/duplexify'); const { once } = require('internal/util'); const destroyImpl = require('internal/streams/destroy'); @@ -219,9 +220,7 @@ function pipeline(...streams) { } else if (isIterable(stream) || isReadableNodeStream(stream)) { ret = stream; } else { - throw new ERR_INVALID_ARG_TYPE( - 'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'], - stream); + ret = duplexify(stream, 'source'); } } else if (typeof stream === 'function') { ret = makeAsyncIterable(ret); @@ -290,8 +289,7 @@ function pipeline(...streams) { ret = stream; } else { const name = reading ? `transform[${i - 1}]` : 'destination'; - throw new ERR_INVALID_ARG_TYPE( - name, ['Stream', 'Function'], stream); + ret = duplexify(stream, name); } } diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 01396b5113340f..e6543c21a21334 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -27,6 +27,15 @@ function isWritableNodeStream(obj) { ); } +function isDuplexNodeStream(obj) { + return !!( + obj && + typeof obj.pipe === 'function' && + typeof obj.on === 'function' && + typeof obj.write === 'function' + ); +} + function isNodeStream(obj) { return ( obj && @@ -199,6 +208,7 @@ module.exports = { kDestroyed, isClosed, isDestroyed, + isDuplexNodeStream, isFinished, isIterable, isReadable, diff --git a/lib/stream.js b/lib/stream.js index b84efb0fd8862d..dec9cec695076c 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -34,6 +34,7 @@ const compose = require('internal/streams/compose'); const { destroyer } = require('internal/streams/destroy'); const eos = require('internal/streams/end-of-stream'); const internalBuffer = require('internal/buffer'); +const duplexify = require('internal/streams/duplexify'); const promises = require('stream/promises'); @@ -41,6 +42,9 @@ const Stream = module.exports = require('internal/streams/legacy').Stream; Stream.Readable = require('internal/streams/readable'); Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); +Stream.Duplex.from = function from(body) { + return duplexify(body, 'body'); +}; Stream.Transform = require('internal/streams/transform'); Stream.PassThrough = require('internal/streams/passthrough'); Stream.pipeline = pipeline; diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index d02f0c71860554..1528a808a6dd03 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -104,6 +104,7 @@ const expectedModules = new Set([ 'NativeModule internal/streams/compose', 'NativeModule internal/streams/destroy', 'NativeModule internal/streams/duplex', + 'NativeModule internal/streams/duplexify', 'NativeModule internal/streams/end-of-stream', 'NativeModule internal/streams/from', 'NativeModule internal/streams/legacy',