diff --git a/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js b/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js index 5ed4a2dd7a3..a3247d24481 100644 --- a/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js +++ b/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js @@ -3,11 +3,15 @@ import '@agoric/install-ses'; import anylogger from 'anylogger'; import fs from 'fs'; -import Netstring from 'netstring-stream'; import { assert } from '@agoric/assert'; import { importBundle } from '@agoric/import-bundle'; import { Remotable, getInterfaceOf, makeMarshal } from '@agoric/marshal'; +import { arrayEncoderStream, arrayDecoderStream } from '../../worker-protocol'; +import { + netstringEncoderStream, + netstringDecoderStream, +} from '../../netstring'; import { waitUntilQuiescent } from '../../waitUntilQuiescent'; import { makeLiveSlots } from '../liveSlots'; @@ -71,17 +75,19 @@ function doNotify(vpid, vp) { } } -const toParent = Netstring.writeStream(); -toParent.pipe(fs.createWriteStream('IGNORED', { fd: 4, encoding: 'utf-8' })); +const toParent = arrayEncoderStream(); +toParent + .pipe(netstringEncoderStream()) + .pipe(fs.createWriteStream('IGNORED', { fd: 4, encoding: 'utf-8' })); const fromParent = fs .createReadStream('IGNORED', { fd: 3, encoding: 'utf-8' }) - .pipe(Netstring.readStream()); -fromParent.setEncoding('utf-8'); + .pipe(netstringDecoderStream()) + .pipe(arrayDecoderStream()); function sendUplink(msg) { assert(msg instanceof Array, `msg must be an Array`); - toParent.write(JSON.stringify(msg)); + toParent.write(msg); } // fromParent.on('data', data => { @@ -90,7 +96,7 @@ function sendUplink(msg) { // }); fromParent.on('data', data => { - const [type, ...margs] = JSON.parse(data); + const [type, ...margs] = data; workerLog(`received`, type); if (type === 'start') { // TODO: parent should send ['start', vatID] diff --git a/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js b/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js index d5d1bfc466a..7c4f45db8c4 100644 --- a/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js +++ b/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js @@ -75,8 +75,8 @@ export function makeNodeSubprocessFactory(tools) { const { fromChild, toChild, terminate, done } = startSubprocessWorker(); function sendToWorker(msg) { - assert(msg instanceof Array); - toChild.write(JSON.stringify(msg)); + assert(Array.isArray(msg)); + toChild.write(msg); } const { @@ -114,10 +114,7 @@ export function makeNodeSubprocessFactory(tools) { } } - fromChild.on('data', data => { - const msg = JSON.parse(data); - handleUpstream(msg); - }); + fromChild.on('data', handleUpstream); parentLog(`instructing worker to load bundle..`); sendToWorker(['setBundle', bundle, vatParameters]); diff --git a/packages/SwingSet/src/spawnSubprocessWorker.js b/packages/SwingSet/src/spawnSubprocessWorker.js index e02008d6225..355e44d4e9c 100644 --- a/packages/SwingSet/src/spawnSubprocessWorker.js +++ b/packages/SwingSet/src/spawnSubprocessWorker.js @@ -1,8 +1,13 @@ // this file is loaded by the controller, in the start compartment import { spawn } from 'child_process'; -import Netstring from 'netstring-stream'; - import { makePromiseKit } from '@agoric/promise-kit'; +import { arrayEncoderStream, arrayDecoderStream } from './worker-protocol'; +import { netstringEncoderStream, netstringDecoderStream } from './netstring'; + +// Start a subprocess from a given executable, and arrange a bidirectional +// message channel with a "supervisor" within that process. Return a { +// toChild, fromChild } pair of Streams which accept/emit hardened Arrays of +// JSON-serializable data. // eslint-disable-next-line no-unused-vars function parentLog(first, ...args) { @@ -18,11 +23,12 @@ const stdio = harden(['inherit', 'inherit', 'inherit', 'pipe', 'pipe']); export function startSubprocessWorker(execPath, procArgs = []) { const proc = spawn(execPath, procArgs, { stdio }); - const toChild = Netstring.writeStream(); - toChild.pipe(proc.stdio[3]); + const toChild = arrayEncoderStream(); + toChild.pipe(netstringEncoderStream()).pipe(proc.stdio[3]); // proc.stdio[4].setEncoding('utf-8'); - const fromChild = proc.stdio[4].pipe(Netstring.readStream()); - fromChild.setEncoding('utf-8'); + const fromChild = proc.stdio[4] + .pipe(netstringDecoderStream()) + .pipe(arrayDecoderStream()); // fromChild.addListener('data', data => parentLog(`fd4 data`, data)); // toChild.write('hello child'); @@ -43,13 +49,13 @@ export function startSubprocessWorker(execPath, procArgs = []) { proc.kill(); } - // the Netstring objects don't like being hardened, so we wrap the methods + // the Transform objects don't like being hardened, so we wrap the methods // that get used const wrappedFromChild = { - on: (evName, f) => fromChild.on(evName, f), + on: (...args) => fromChild.on(...args), }; const wrappedToChild = { - write: data => toChild.write(data), + write: (...args) => toChild.write(...args), }; return harden({