Skip to content

Commit

Permalink
feat: add stream support and fix some bugs (#28)
Browse files Browse the repository at this point in the history
* feat: add stream support and fix some bugs

* fix: typo length

---------

Co-authored-by: 唐烨 <[email protected]>
  • Loading branch information
tangye1234 and 唐烨 authored Mar 31, 2023
1 parent e61a34a commit b0c4161
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 28 deletions.
46 changes: 39 additions & 7 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,22 @@ export const getRequestListener = (fetchCallback: FetchCallback) => {

try {
res = (await fetchCallback(new Request(url.toString(), init))) as Response
} catch {
} catch (e: unknown) {
res = new Response(null, { status: 500 })
if (e instanceof Error) {
// timeout error emits 504 timeout
if (e.name === 'TimeoutError' || e.constructor.name === 'TimeoutError') {
res = new Response(null, { status: 504 })
}
}
}

const contentType = res.headers.get('content-type') || ''
// nginx buffering variant
const buffering = res.headers.get('x-accel-buffering') || ''
const contentEncoding = res.headers.get('content-encoding')
const contentLength = res.headers.get('content-length')
const transferEncoding = res.headers.get('transfer-encoding')

for (const [k, v] of res.headers) {
if (k === 'set-cookie') {
Expand All @@ -55,12 +65,34 @@ export const getRequestListener = (fetchCallback: FetchCallback) => {
outgoing.statusCode = res.status

if (res.body) {
if (!contentEncoding && contentType.startsWith('text')) {
outgoing.end(await res.text())
} else if (!contentEncoding && contentType.startsWith('application/json')) {
outgoing.end(await res.text())
} else {
await writeReadableStreamToWritable(res.body, outgoing)
try {
/**
* If content-encoding is set, we assume that the response should be not decoded.
* Else if transfer-encoding is set, we assume that the response should be streamed.
* Else if content-length is set, we assume that the response content has been taken care of.
* Else if x-accel-buffering is set to no, we assume that the response should be streamed.
* Else if content-type is not application/json nor text/* but can be text/event-stream,
* we assume that the response should be streamed.
*/
if (
contentEncoding ||
transferEncoding ||
contentLength ||
/^no$/i.test(buffering) ||
!/^(application\/json\b|text\/(?!event-stream\b))/i.test(contentType)
) {
await writeReadableStreamToWritable(res.body, outgoing)
} else {
const text = await res.text()
outgoing.setHeader('Content-Length', Buffer.byteLength(text))
outgoing.end(text)
}
} catch (e: unknown) {
// try to catch any error, to avoid crash
console.error(e)
const err = e instanceof Error ? e : new Error('unknown error', { cause: e })
// destroy error must accept an instance of Error
outgoing.destroy(err)
}
} else {
outgoing.end()
Expand Down
37 changes: 17 additions & 20 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,27 @@
import type { Writable } from 'node:stream'

export async function writeReadableStreamToWritable(stream: ReadableStream, writable: Writable) {
let reader = stream.getReader()
const reader = stream.getReader()

async function read() {
let { done, value } = await reader.read()
function onClose() {
reader.cancel(new Error('Response writer closed'))
}

if (done) {
writable.end()
return
}
writable.once('close', onClose)

writable.write(value)
try {
while (true) {
const { done, value } = await reader.read()

await read()
}
if (done) {
writable.end()
return
}

try {
await read()
} catch (error: any) {
writable.destroy(error)
throw error
writable.write(value)
}
} finally {
writable.off('close', onClose)
reader.releaseLock()
}
}

/**
* Credits:
* - https://github.com/remix-run/remix/blob/e77e2eb/packages/remix-node/stream.ts
*/
86 changes: 86 additions & 0 deletions test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,89 @@ describe('Basic Auth Middleware', () => {
expect(res.text).toBe('auth')
})
})

describe('Stream and non-stream response', () => {
const app = new Hono()

app.get('/json', (c) => c.json({ foo: 'bar' }))
app.get('/text', (c) => c.text('Hello!'))
app.get('/json-stream', (c) => {
c.header('x-accel-buffering', 'no')
return c.json({ foo: 'bar' })
})
app.get('/stream', (c) => {
const stream = new ReadableStream({
async start(controller) {
controller.enqueue('data: Hello!\n\n')
await new Promise((resolve) => setTimeout(resolve, 100))
controller.enqueue('data: end\n\n')
controller.close()
}
})

c.header('Content-Type', 'text/event-stream; charset=utf-8')
return c.body(stream)
})

app.get('/error-stream', (c) => {
const stream = new ReadableStream({
async start(controller) {
controller.enqueue('data: Hello!\n\n')
await new Promise((resolve) => setTimeout(resolve, 100))
controller.enqueue('data: end\n\n')
controller.error(new Error('test'))
}
})

c.header('Content-Type', 'text/event-stream; charset=utf-8')
return c.body(stream)
})

const server = createAdaptorServer(app)

it('Should return JSON body', async () => {
const res = await request(server).get('/json')
expect(res.status).toBe(200)
expect(res.headers['content-length']).toMatch('13')
expect(res.headers['content-type']).toMatch(/application\/json/)
expect(JSON.parse(res.text)).toEqual({ foo: 'bar' })
})

it('Should return text body', async () => {
const res = await request(server).get('/text')
expect(res.status).toBe(200)
expect(res.headers['content-length']).toMatch('6')
expect(res.headers['content-type']).toMatch(/text\/plain/)
expect(res.text).toBe('Hello!')
})

it('Should return JSON body - stream', async () => {
const res = await request(server).get('/json-stream')
expect(res.status).toBe(200)
expect(res.headers['content-length']).toBeUndefined()
expect(res.headers['content-type']).toMatch(/application\/json/)
expect(res.headers['transfer-encoding']).toMatch(/chunked/)
expect(JSON.parse(res.text)).toEqual({ foo: 'bar' })
})

it('Should return text body - stream', async () => {
const res = await request(server).get('/stream').parse((res, fn) => {
const chunks: string[] = ['data: Hello!\n\n', 'data: end\n\n']
let index = 0
res.on('data', (chunk) => {
const str = chunk.toString()
expect(str).toBe(chunks[index++])
})
res.on('end', () => fn(null, ''))
})
expect(res.status).toBe(200)
expect(res.headers['content-length']).toBeUndefined()
expect(res.headers['content-type']).toMatch(/text\/event-stream/)
expect(res.headers['transfer-encoding']).toMatch(/chunked/)
})

it('Should return error - stream without app crashing', async () => {
const result = request(server).get('/error-stream')
await expect(result).rejects.toThrow('aborted')
})
})
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2020",
"target": "es2022",
"module": "commonjs",
"declaration": true,
"moduleResolution": "Node",
Expand Down

0 comments on commit b0c4161

Please sign in to comment.