Skip to content

Commit f663b31

Browse files
ronagTrott
authored andcommitted
stream: always invoke callback before emitting error
Ensure the callback is always invoked before emitting the error in both sync and async case. PR-URL: #29293 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 634a9a9 commit f663b31

7 files changed

+116
-20
lines changed

doc/api/stream.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,8 @@ The `writable.write()` method writes some data to the stream, and calls the
571571
supplied `callback` once the data has been fully handled. If an error
572572
occurs, the `callback` *may or may not* be called with the error as its
573573
first argument. To reliably detect write errors, add a listener for the
574-
`'error'` event.
574+
`'error'` event. If `callback` is called with an error, it will be called
575+
before the `'error'` event is emitted.
575576

576577
The return value is `true` if the internal buffer is less than the
577578
`highWaterMark` configured when the stream was created after admitting `chunk`.

lib/_stream_writable.js

+21-16
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ function WritableState(options, stream, isDuplex) {
158158
// Should .destroy() be called after 'finish' (and potentially 'end')
159159
this.autoDestroy = !!(options && options.autoDestroy);
160160

161+
// Indicates whether the stream has errored. When true all write() calls
162+
// should return false. This is needed since when autoDestroy
163+
// is disabled we need a way to tell whether the stream has failed.
164+
this.errored = false;
165+
161166
// Count buffered requests
162167
this.bufferedRequestCount = 0;
163168

@@ -401,7 +406,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
401406
if (!ret)
402407
state.needDrain = true;
403408

404-
if (state.writing || state.corked) {
409+
if (state.writing || state.corked || state.errored) {
405410
var last = state.lastBufferedRequest;
406411
state.lastBufferedRequest = {
407412
chunk,
@@ -420,7 +425,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
420425
doWrite(stream, state, false, len, chunk, encoding, cb);
421426
}
422427

423-
return ret;
428+
// Return false if errored or destroyed in order to break
429+
// any synchronous while(stream.write(data)) loops.
430+
return ret && !state.errored && !state.destroyed;
424431
}
425432

426433
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
@@ -437,18 +444,11 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
437444
state.sync = false;
438445
}
439446

440-
function onwriteError(stream, state, sync, er, cb) {
447+
function onwriteError(stream, state, er, cb) {
441448
--state.pendingcb;
442449

443-
if (sync) {
444-
// Defer the callback if we are being called synchronously
445-
// to avoid piling up things on the stack
446-
process.nextTick(cb, er);
447-
} else {
448-
// The caller expect this to happen before if
449-
// it is async
450-
cb(er);
451-
}
450+
cb(er);
451+
// This can emit error, but error must always follow cb.
452452
errorOrDestroy(stream, er);
453453
}
454454

@@ -465,9 +465,14 @@ function onwrite(stream, er) {
465465
state.length -= state.writelen;
466466
state.writelen = 0;
467467

468-
if (er)
469-
onwriteError(stream, state, sync, er, cb);
470-
else {
468+
if (er) {
469+
state.errored = true;
470+
if (sync) {
471+
process.nextTick(onwriteError, stream, state, er, cb);
472+
} else {
473+
onwriteError(stream, state, er, cb);
474+
}
475+
} else {
471476
// Check if we're actually ready to finish, but don't emit yet
472477
var finished = needFinish(state) || stream.destroyed;
473478

@@ -622,7 +627,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', {
622627
function needFinish(state) {
623628
return (state.ending &&
624629
state.length === 0 &&
625-
!state.errorEmitted &&
630+
!state.errored &&
626631
state.bufferedRequest === null &&
627632
!state.finished &&
628633
!state.writing);

lib/internal/streams/destroy.js

+12-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ function destroy(err, cb) {
2727
const r = this._readableState;
2828
const w = this._writableState;
2929

30+
if (w && err) {
31+
w.errored = true;
32+
}
33+
3034
if ((w && w.destroyed) || (r && r.destroyed)) {
3135
if (cb) {
3236
cb(err);
@@ -50,10 +54,12 @@ function destroy(err, cb) {
5054
this._destroy(err || null, (err) => {
5155
const emitClose = (w && w.emitClose) || (r && r.emitClose);
5256
if (cb) {
57+
// Invoke callback before scheduling emitClose so that callback
58+
// can schedule before.
59+
cb(err);
5360
if (emitClose) {
5461
process.nextTick(emitCloseNT, this);
5562
}
56-
cb(err);
5763
} else if (needError(this, err)) {
5864
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
5965
} else if (emitClose) {
@@ -91,6 +97,7 @@ function undestroy() {
9197

9298
if (w) {
9399
w.destroyed = false;
100+
w.errored = false;
94101
w.ended = false;
95102
w.ending = false;
96103
w.finalCalled = false;
@@ -110,6 +117,10 @@ function errorOrDestroy(stream, err) {
110117
const r = stream._readableState;
111118
const w = stream._writableState;
112119

120+
if (w & err) {
121+
w.errored = true;
122+
}
123+
113124
if ((r && r.autoDestroy) || (w && w.autoDestroy))
114125
stream.destroy(err);
115126
else if (needError(stream, err))

test/parallel/test-http2-reset-flood.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => {
6767
h2header.writeIntBE(1, 0, 3); // Length: 1
6868
h2header.writeIntBE(i, 5, 4); // Stream ID
6969
// 0x88 = :status: 200
70-
conn.write(Buffer.concat([h2header, Buffer.from([0x88])]));
70+
if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) {
71+
process.nextTick(writeRequests);
72+
break;
73+
}
7174
}
7275
}
7376

test/parallel/test-stream-writable-destroy.js

+14
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,20 @@ const assert = require('assert');
1616
assert.strictEqual(write.destroyed, true);
1717
}
1818

19+
{
20+
const write = new Writable({
21+
write(chunk, enc, cb) {
22+
this.destroy(new Error('asd'));
23+
cb();
24+
}
25+
});
26+
27+
write.on('error', common.mustCall());
28+
write.on('finish', common.mustNotCall());
29+
write.end('asd');
30+
assert.strictEqual(write.destroyed, true);
31+
}
32+
1933
{
2034
const write = new Writable({
2135
write(chunk, enc, cb) { cb(); }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { Writable } = require('stream');
4+
const assert = require('assert');
5+
6+
// Ensure callback is always invoked before
7+
// error is emitted. Regardless if error was
8+
// sync or async.
9+
10+
{
11+
let callbackCalled = false;
12+
// Sync Error
13+
const writable = new Writable({
14+
write: common.mustCall((buf, enc, cb) => {
15+
cb(new Error());
16+
})
17+
});
18+
writable.on('error', common.mustCall(() => {
19+
assert.strictEqual(callbackCalled, true);
20+
}));
21+
writable.write('hi', common.mustCall(() => {
22+
callbackCalled = true;
23+
}));
24+
}
25+
26+
{
27+
let callbackCalled = false;
28+
// Async Error
29+
const writable = new Writable({
30+
write: common.mustCall((buf, enc, cb) => {
31+
process.nextTick(cb, new Error());
32+
})
33+
});
34+
writable.on('error', common.mustCall(() => {
35+
assert.strictEqual(callbackCalled, true);
36+
}));
37+
writable.write('hi', common.mustCall(() => {
38+
callbackCalled = true;
39+
}));
40+
}
41+
42+
{
43+
// Sync Error
44+
const writable = new Writable({
45+
write: common.mustCall((buf, enc, cb) => {
46+
cb(new Error());
47+
})
48+
});
49+
50+
writable.on('error', common.mustCall());
51+
52+
let cnt = 0;
53+
// Ensure we don't live lock on sync error
54+
while (writable.write('a'))
55+
cnt++;
56+
57+
assert.strictEqual(cnt, 0);
58+
}

test/parallel/test-wrap-js-stream-exceptions.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,8 @@ const socket = new JSStreamWrap(new Duplex({
1616
})
1717
}));
1818

19-
assert.throws(() => socket.end('foo'), /Error: write EPROTO/);
19+
socket.end('foo');
20+
socket.on('error', common.expectsError({
21+
type: Error,
22+
message: 'write EPROTO'
23+
}));

0 commit comments

Comments
 (0)