Skip to content

Commit f7a4534

Browse files
committed
refactor websocket control frame handling
Co-authored-by: tai-kun <[email protected]> fixup state is never modified; doesn't need to be reset add test
1 parent eadf781 commit f7a4534

File tree

3 files changed

+189
-117
lines changed

3 files changed

+189
-117
lines changed

lib/web/websocket/receiver.js

+137-116
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
'use strict'
22

33
const { Writable } = require('node:stream')
4+
const assert = require('node:assert')
45
const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants')
56
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
67
const { channels } = require('../../core/diagnostics')
7-
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode } = require('./util')
8+
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame } = require('./util')
89
const { WebsocketFrameSend } = require('./frame')
910
const { CloseEvent } = require('./events')
1011

@@ -53,30 +54,39 @@ class ByteParser extends Writable {
5354
}
5455

5556
const buffer = this.consume(2)
57+
const fin = (buffer[0] & 0x80) !== 0
58+
const opcode = buffer[0] & 0x0F
59+
const masked = (buffer[1] & 0x80) === 0x80
5660

57-
this.#info.fin = (buffer[0] & 0x80) !== 0
58-
this.#info.opcode = buffer[0] & 0x0F
59-
this.#info.masked = (buffer[1] & 0x80) === 0x80
60-
61-
if (this.#info.masked) {
61+
if (masked) {
6262
failWebsocketConnection(this.ws, 'Frame cannot be masked')
6363
return callback()
6464
}
6565

66-
// If we receive a fragmented message, we use the type of the first
67-
// frame to parse the full message as binary/text, when it's terminated
68-
this.#info.originalOpcode ??= this.#info.opcode
69-
70-
this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION
66+
const fragmented = !fin && opcode !== opcodes.CONTINUATION
7167

72-
if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
68+
if (fragmented && opcode !== opcodes.BINARY && opcode !== opcodes.TEXT) {
7369
// Only text and binary frames can be fragmented
7470
failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
7571
return
7672
}
7773

7874
const payloadLength = buffer[1] & 0x7F
7975

76+
if (isControlFrame(opcode)) {
77+
const loop = this.parseControlFrame(callback, {
78+
opcode,
79+
fragmented,
80+
payloadLength
81+
})
82+
83+
if (loop) {
84+
continue
85+
} else {
86+
return
87+
}
88+
}
89+
8090
if (payloadLength <= 125) {
8191
this.#info.payloadLength = payloadLength
8292
this.#state = parserStates.READ_DATA
@@ -86,114 +96,18 @@ class ByteParser extends Writable {
8696
this.#state = parserStates.PAYLOADLENGTH_64
8797
}
8898

99+
// TODO(@KhafraDev): handle continuation frames separately as their
100+
// semantics are different from TEXT/BINARY frames.
101+
this.#info.originalOpcode ??= opcode
102+
this.#info.opcode = opcode
103+
this.#info.masked = masked
104+
this.#info.fin = fin
105+
this.#info.fragmented = fragmented
106+
89107
if (this.#info.fragmented && payloadLength > 125) {
90108
// A fragmented frame can't be fragmented itself
91109
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
92110
return
93-
} else if (
94-
(this.#info.opcode === opcodes.PING ||
95-
this.#info.opcode === opcodes.PONG ||
96-
this.#info.opcode === opcodes.CLOSE) &&
97-
payloadLength > 125
98-
) {
99-
// Control frames can have a payload length of 125 bytes MAX
100-
failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.')
101-
return
102-
} else if (this.#info.opcode === opcodes.CLOSE) {
103-
if (payloadLength === 1) {
104-
failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
105-
return
106-
}
107-
108-
const body = this.consume(payloadLength)
109-
110-
this.#info.closeInfo = this.parseCloseBody(body)
111-
112-
if (this.#info.closeInfo.error) {
113-
const { code, reason } = this.#info.closeInfo
114-
115-
callback(new CloseEvent('close', { wasClean: false, reason, code }))
116-
return
117-
}
118-
119-
if (this.ws[kSentClose] !== sentCloseFrameState.SENT) {
120-
// If an endpoint receives a Close frame and did not previously send a
121-
// Close frame, the endpoint MUST send a Close frame in response. (When
122-
// sending a Close frame in response, the endpoint typically echos the
123-
// status code it received.)
124-
let body = emptyBuffer
125-
if (this.#info.closeInfo.code) {
126-
body = Buffer.allocUnsafe(2)
127-
body.writeUInt16BE(this.#info.closeInfo.code, 0)
128-
}
129-
const closeFrame = new WebsocketFrameSend(body)
130-
131-
this.ws[kResponse].socket.write(
132-
closeFrame.createFrame(opcodes.CLOSE),
133-
(err) => {
134-
if (!err) {
135-
this.ws[kSentClose] = sentCloseFrameState.SENT
136-
}
137-
}
138-
)
139-
}
140-
141-
// Upon either sending or receiving a Close control frame, it is said
142-
// that _The WebSocket Closing Handshake is Started_ and that the
143-
// WebSocket connection is in the CLOSING state.
144-
this.ws[kReadyState] = states.CLOSING
145-
this.ws[kReceivedClose] = true
146-
147-
this.end()
148-
149-
return
150-
} else if (this.#info.opcode === opcodes.PING) {
151-
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
152-
// response, unless it already received a Close frame.
153-
// A Pong frame sent in response to a Ping frame must have identical
154-
// "Application data"
155-
156-
const body = this.consume(payloadLength)
157-
158-
if (!this.ws[kReceivedClose]) {
159-
const frame = new WebsocketFrameSend(body)
160-
161-
this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))
162-
163-
if (channels.ping.hasSubscribers) {
164-
channels.ping.publish({
165-
payload: body
166-
})
167-
}
168-
}
169-
170-
this.#state = parserStates.INFO
171-
172-
if (this.#byteOffset > 0) {
173-
continue
174-
} else {
175-
callback()
176-
return
177-
}
178-
} else if (this.#info.opcode === opcodes.PONG) {
179-
// A Pong frame MAY be sent unsolicited. This serves as a
180-
// unidirectional heartbeat. A response to an unsolicited Pong frame is
181-
// not expected.
182-
183-
const body = this.consume(payloadLength)
184-
185-
if (channels.pong.hasSubscribers) {
186-
channels.pong.publish({
187-
payload: body
188-
})
189-
}
190-
191-
if (this.#byteOffset > 0) {
192-
continue
193-
} else {
194-
callback()
195-
return
196-
}
197111
}
198112
} else if (this.#state === parserStates.PAYLOADLENGTH_16) {
199113
if (this.#byteOffset < 2) {
@@ -303,6 +217,8 @@ class ByteParser extends Writable {
303217
}
304218

305219
parseCloseBody (data) {
220+
assert(data.length !== 1)
221+
306222
// https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
307223
/** @type {number|undefined} */
308224
let code
@@ -336,6 +252,111 @@ class ByteParser extends Writable {
336252
return { code, reason, error: false }
337253
}
338254

255+
/**
256+
* Parses control frames.
257+
* @param {Buffer} data
258+
* @param {(err?: Error) => void} callback
259+
* @param {{ opcode: number, fragmented: boolean, payloadLength: number }} info
260+
*/
261+
parseControlFrame (callback, info) {
262+
assert(!info.fragmented)
263+
264+
if (info.payloadLength > 125) {
265+
// Control frames can have a payload length of 125 bytes MAX
266+
callback(new Error('Payload length for control frame exceeded 125 bytes.'))
267+
return false
268+
}
269+
270+
const body = this.consume(info.payloadLength)
271+
272+
if (info.opcode === opcodes.CLOSE) {
273+
if (info.payloadLength === 1) {
274+
failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
275+
return
276+
}
277+
278+
this.#info.closeInfo = this.parseCloseBody(body)
279+
280+
if (this.#info.closeInfo.error) {
281+
const { code, reason } = this.#info.closeInfo
282+
283+
callback(new CloseEvent('close', { wasClean: false, reason, code }))
284+
return
285+
}
286+
287+
if (this.ws[kSentClose] !== sentCloseFrameState.SENT) {
288+
// If an endpoint receives a Close frame and did not previously send a
289+
// Close frame, the endpoint MUST send a Close frame in response. (When
290+
// sending a Close frame in response, the endpoint typically echos the
291+
// status code it received.)
292+
let body = emptyBuffer
293+
if (this.#info.closeInfo.code) {
294+
body = Buffer.allocUnsafe(2)
295+
body.writeUInt16BE(this.#info.closeInfo.code, 0)
296+
}
297+
const closeFrame = new WebsocketFrameSend(body)
298+
299+
this.ws[kResponse].socket.write(
300+
closeFrame.createFrame(opcodes.CLOSE),
301+
(err) => {
302+
if (!err) {
303+
this.ws[kSentClose] = sentCloseFrameState.SENT
304+
}
305+
}
306+
)
307+
}
308+
309+
// Upon either sending or receiving a Close control frame, it is said
310+
// that _The WebSocket Closing Handshake is Started_ and that the
311+
// WebSocket connection is in the CLOSING state.
312+
this.ws[kReadyState] = states.CLOSING
313+
this.ws[kReceivedClose] = true
314+
315+
this.end()
316+
317+
return
318+
} else if (info.opcode === opcodes.PING) {
319+
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
320+
// response, unless it already received a Close frame.
321+
// A Pong frame sent in response to a Ping frame must have identical
322+
// "Application data"
323+
324+
if (!this.ws[kReceivedClose]) {
325+
const frame = new WebsocketFrameSend(body)
326+
327+
this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))
328+
329+
if (channels.ping.hasSubscribers) {
330+
channels.ping.publish({
331+
payload: body
332+
})
333+
}
334+
}
335+
336+
if (this.#byteOffset <= 0) {
337+
callback()
338+
return false
339+
}
340+
} else if (info.opcode === opcodes.PONG) {
341+
// A Pong frame MAY be sent unsolicited. This serves as a
342+
// unidirectional heartbeat. A response to an unsolicited Pong frame is
343+
// not expected.
344+
345+
if (channels.pong.hasSubscribers) {
346+
channels.pong.publish({
347+
payload: body
348+
})
349+
}
350+
351+
if (this.#byteOffset <= 0) {
352+
callback()
353+
return false
354+
}
355+
}
356+
357+
return true
358+
}
359+
339360
get closingInfo () {
340361
return this.#info.closeInfo
341362
}

lib/web/websocket/util.js

+14-1
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,18 @@ function failWebsocketConnection (ws, reason) {
210210
}
211211
}
212212

213+
/**
214+
* @see https://datatracker.ietf.org/doc/html/rfc6455#section-5.5
215+
* @param {number} opcode
216+
*/
217+
function isControlFrame (opcode) {
218+
return (
219+
opcode === opcodes.CLOSE ||
220+
opcode === opcodes.PING ||
221+
opcode === opcodes.PONG
222+
)
223+
}
224+
213225
// https://nodejs.org/api/intl.html#detecting-internationalization-support
214226
const hasIntl = typeof process.versions.icu === 'string'
215227
const fatalDecoder = hasIntl ? new TextDecoder('utf-8', { fatal: true }) : undefined
@@ -237,5 +249,6 @@ module.exports = {
237249
isValidStatusCode,
238250
failWebsocketConnection,
239251
websocketMessageReceived,
240-
utf8Decode
252+
utf8Decode,
253+
isControlFrame
241254
}

test/websocket/issue-2859.js

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict'
2+
3+
const { test } = require('node:test')
4+
const { WebSocketServer } = require('ws')
5+
const { WebSocket } = require('../..')
6+
const diagnosticsChannel = require('node:diagnostics_channel')
7+
const { tspl } = require('@matteo.collina/tspl')
8+
9+
test('Fragmented frame with a ping frame in the first of it', async (t) => {
10+
const { completed, deepStrictEqual, strictEqual } = tspl(t, { plan: 2 })
11+
12+
const server = new WebSocketServer({ port: 0 })
13+
14+
server.on('connection', (ws) => {
15+
const socket = ws._socket
16+
17+
socket.write(Buffer.from([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f])) // ping "Hello"
18+
socket.write(Buffer.from([0x01, 0x03, 0x48, 0x65, 0x6c])) // Text frame "Hel"
19+
socket.write(Buffer.from([0x80, 0x02, 0x6c, 0x6f])) // Text frame "lo"
20+
})
21+
22+
t.after(() => {
23+
server.close()
24+
ws.close()
25+
})
26+
27+
const ws = new WebSocket(`ws://127.0.0.1:${server.address().port}`)
28+
29+
diagnosticsChannel.channel('undici:websocket:ping').subscribe(
30+
({ payload }) => deepStrictEqual(payload, Buffer.from('Hello'))
31+
)
32+
33+
ws.addEventListener('message', ({ data }) => {
34+
strictEqual(data, 'Hello')
35+
})
36+
37+
await completed
38+
})

0 commit comments

Comments
 (0)