Skip to content

Commit

Permalink
fix: change encoders/decoders for kernel-worker protocol endpoints
Browse files Browse the repository at this point in the history
Also remove the `setEncoding()` call on kernel-worker pipes, I think it was
disabling the objectMode settings
  • Loading branch information
warner committed Oct 1, 2020
1 parent e23b7bb commit 8eb13fa
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 22 deletions.
20 changes: 13 additions & 7 deletions packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ import '@agoric/install-ses';

import anylogger from 'anylogger';
import fs from 'fs';
import Netstring from 'netstring-stream';

import { assert } from '@agoric/assert';
import { importBundle } from '@agoric/import-bundle';
import { Remotable, getInterfaceOf, makeMarshal } from '@agoric/marshal';
import { arrayEncoderStream, arrayDecoderStream } from '../../worker-protocol';
import {
netstringEncoderStream,
netstringDecoderStream,
} from '../../netstring';
import { waitUntilQuiescent } from '../../waitUntilQuiescent';
import { makeLiveSlots } from '../liveSlots';

Expand Down Expand Up @@ -71,17 +75,19 @@ function doNotify(vpid, vp) {
}
}

const toParent = Netstring.writeStream();
toParent.pipe(fs.createWriteStream('IGNORED', { fd: 4, encoding: 'utf-8' }));
const toParent = arrayEncoderStream();
toParent
.pipe(netstringEncoderStream())
.pipe(fs.createWriteStream('IGNORED', { fd: 4, encoding: 'utf-8' }));

const fromParent = fs
.createReadStream('IGNORED', { fd: 3, encoding: 'utf-8' })
.pipe(Netstring.readStream());
fromParent.setEncoding('utf-8');
.pipe(netstringDecoderStream())
.pipe(arrayDecoderStream());

function sendUplink(msg) {
assert(msg instanceof Array, `msg must be an Array`);
toParent.write(JSON.stringify(msg));
toParent.write(msg);
}

// fromParent.on('data', data => {
Expand All @@ -90,7 +96,7 @@ function sendUplink(msg) {
// });

fromParent.on('data', data => {
const [type, ...margs] = JSON.parse(data);
const [type, ...margs] = data;
workerLog(`received`, type);
if (type === 'start') {
// TODO: parent should send ['start', vatID]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ export function makeNodeSubprocessFactory(tools) {
const { fromChild, toChild, terminate, done } = startSubprocessWorker();

function sendToWorker(msg) {
assert(msg instanceof Array);
toChild.write(JSON.stringify(msg));
assert(Array.isArray(msg));
toChild.write(msg);
}

const {
Expand Down Expand Up @@ -114,10 +114,7 @@ export function makeNodeSubprocessFactory(tools) {
}
}

fromChild.on('data', data => {
const msg = JSON.parse(data);
handleUpstream(msg);
});
fromChild.on('data', handleUpstream);

parentLog(`instructing worker to load bundle..`);
sendToWorker(['setBundle', bundle, vatParameters]);
Expand Down
24 changes: 15 additions & 9 deletions packages/SwingSet/src/spawnSubprocessWorker.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
// this file is loaded by the controller, in the start compartment
import { spawn } from 'child_process';
import Netstring from 'netstring-stream';

import { makePromiseKit } from '@agoric/promise-kit';
import { arrayEncoderStream, arrayDecoderStream } from './worker-protocol';
import { netstringEncoderStream, netstringDecoderStream } from './netstring';

// Start a subprocess from a given executable, and arrange a bidirectional
// message channel with a "supervisor" within that process. Return a {
// toChild, fromChild } pair of Streams which accept/emit hardened Arrays of
// JSON-serializable data.

// eslint-disable-next-line no-unused-vars
function parentLog(first, ...args) {
Expand All @@ -18,11 +23,12 @@ const stdio = harden(['inherit', 'inherit', 'inherit', 'pipe', 'pipe']);
export function startSubprocessWorker(execPath, procArgs = []) {
const proc = spawn(execPath, procArgs, { stdio });

const toChild = Netstring.writeStream();
toChild.pipe(proc.stdio[3]);
const toChild = arrayEncoderStream();
toChild.pipe(netstringEncoderStream()).pipe(proc.stdio[3]);
// proc.stdio[4].setEncoding('utf-8');
const fromChild = proc.stdio[4].pipe(Netstring.readStream());
fromChild.setEncoding('utf-8');
const fromChild = proc.stdio[4]
.pipe(netstringDecoderStream())
.pipe(arrayDecoderStream());

// fromChild.addListener('data', data => parentLog(`fd4 data`, data));
// toChild.write('hello child');
Expand All @@ -43,13 +49,13 @@ export function startSubprocessWorker(execPath, procArgs = []) {
proc.kill();
}

// the Netstring objects don't like being hardened, so we wrap the methods
// the Transform objects don't like being hardened, so we wrap the methods
// that get used
const wrappedFromChild = {
on: (evName, f) => fromChild.on(evName, f),
on: (...args) => fromChild.on(...args),
};
const wrappedToChild = {
write: data => toChild.write(data),
write: (...args) => toChild.write(...args),
};

return harden({
Expand Down

0 comments on commit 8eb13fa

Please sign in to comment.