Skip to content

Commit d8b3e11

Browse files
committed
Fix issue of "too many handles" error when downloading a large blob
1 parent 3f64420 commit d8b3e11

File tree

4 files changed

+98
-26
lines changed

4 files changed

+98
-26
lines changed

src/common/persistence/FSExtentStore.ts

+11-18
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import {
22
close,
3-
createReadStream,
43
createWriteStream,
54
fdatasync,
65
mkdir,
@@ -30,6 +29,7 @@ import IExtentStore, {
3029
} from "./IExtentStore";
3130
import IOperationQueue from "./IOperationQueue";
3231
import OperationQueue from "./OperationQueue";
32+
import FileLazyReadStream from "./FileLazyReadStream";
3333

3434
const statAsync = promisify(stat);
3535
const mkdirAsync = promisify(mkdir);
@@ -333,26 +333,19 @@ export default class FSExtentStore implements IExtentStore {
333333
const op = () =>
334334
new Promise<NodeJS.ReadableStream>((resolve, reject) => {
335335
this.logger.verbose(
336-
`FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${
337-
extentChunk.id
338-
} path:${path} offset:${extentChunk.offset} count:${
339-
extentChunk.count
336+
`FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${extentChunk.id
337+
} path:${path} offset:${extentChunk.offset} count:${extentChunk.count
340338
} end:${extentChunk.offset + extentChunk.count - 1}`,
341339
contextId
342340
);
343-
const stream = createReadStream(path, {
344-
start: extentChunk.offset,
345-
end: extentChunk.offset + extentChunk.count - 1
346-
}).on("close", () => {
347-
this.logger.verbose(
348-
`FSExtentStore:readExtent() Read stream closed. LocationId:${persistencyId} extentId:${
349-
extentChunk.id
350-
} path:${path} offset:${extentChunk.offset} count:${
351-
extentChunk.count
352-
} end:${extentChunk.offset + extentChunk.count - 1}`,
353-
contextId
354-
);
355-
});
341+
const stream = new FileLazyReadStream(
342+
path,
343+
extentChunk.offset,
344+
extentChunk.offset + extentChunk.count - 1,
345+
this.logger,
346+
persistencyId,
347+
extentChunk.id,
348+
contextId);
356349
resolve(stream);
357350
});
358351

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { ReadStream, createReadStream } from "fs";
2+
import { Readable } from "stream";
3+
import ILogger from "../ILogger";
4+
5+
6+
export default class FileLazyReadStream extends Readable {
7+
private extentStream: ReadStream | undefined;
8+
constructor(
9+
private readonly extentPath: string,
10+
private readonly start: number,
11+
private readonly end: number,
12+
private readonly logger: ILogger,
13+
private readonly persistencyId: string,
14+
private readonly extentId: string,
15+
private readonly contextId?: string) {
16+
super();
17+
}
18+
19+
public _read(): void {
20+
if (this.extentStream === undefined) {
21+
this.extentStream = createReadStream(this.extentPath, {
22+
start: this.start,
23+
end: this.end
24+
}).on("close", () => {
25+
this.logger.verbose(
26+
`FSExtentStore:readExtent() Read stream closed. LocationId:${this.persistencyId} extentId:${this.extentId
27+
} path:${this.extentPath} offset:${this.start} end:${this.end}`,
28+
this.contextId
29+
);
30+
});
31+
this.setSourceEventHandlers();
32+
}
33+
this.extentStream?.resume();
34+
}
35+
36+
private setSourceEventHandlers() {
37+
this.extentStream?.on("data", this.sourceDataHandler);
38+
this.extentStream?.on("end", this.sourceErrorOrEndHandler);
39+
this.extentStream?.on("error", this.sourceErrorOrEndHandler);
40+
}
41+
42+
private removeSourceEventHandlers() {
43+
this.extentStream?.removeListener("data", this.sourceDataHandler);
44+
this.extentStream?.removeListener("end", this.sourceErrorOrEndHandler);
45+
this.extentStream?.removeListener("error", this.sourceErrorOrEndHandler);
46+
}
47+
48+
private sourceDataHandler = (data: Buffer) => {
49+
if (!this.push(data)) {
50+
this.extentStream?.pause();
51+
}
52+
}
53+
54+
private sourceErrorOrEndHandler = (err?: Error) => {
55+
if (err && err.name === "AbortError") {
56+
this.destroy(err);
57+
return;
58+
}
59+
60+
this.removeSourceEventHandlers();
61+
this.push(null);
62+
this.destroy(err);
63+
}
64+
65+
_destroy(error: Error | null, callback: (error?: Error) => void): void {
66+
// remove listener from source and release source
67+
//this.removeSourceEventHandlers();
68+
(this.extentStream as Readable).destroy();
69+
70+
callback(error === null ? undefined : error);
71+
}
72+
}

tests/blob/blockblob.highlevel.test.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ describe("BlockBlobHighlevel", () => {
179179
aborter.abort();
180180
}
181181
});
182-
} catch (err) {}
182+
} catch (err) { }
183183
assert.ok(eventTriggered);
184184
}).timeout(timeoutForLargeFileUploadingTest);
185185

@@ -198,7 +198,7 @@ describe("BlockBlobHighlevel", () => {
198198
aborter.abort();
199199
}
200200
});
201-
} catch (err) {}
201+
} catch (err) { }
202202
assert.ok(eventTriggered);
203203
});
204204

@@ -260,7 +260,7 @@ describe("BlockBlobHighlevel", () => {
260260
abortSignal: AbortController.timeout(1)
261261
});
262262
assert.fail();
263-
} catch (err:any) {
263+
} catch (err: any) {
264264
assert.ok((err.message as string).toLowerCase().includes("abort"));
265265
}
266266
}).timeout(timeoutForLargeFileUploadingTest);
@@ -314,7 +314,7 @@ describe("BlockBlobHighlevel", () => {
314314
aborter.abort();
315315
}
316316
});
317-
} catch (err) {}
317+
} catch (err) { }
318318
assert.ok(eventTriggered);
319319
}).timeout(timeoutForLargeFileUploadingTest);
320320

tests/testutils.ts

+11-4
Original file line numberDiff line numberDiff line change
@@ -145,15 +145,15 @@ export async function createRandomLocalFile(
145145

146146
ws.on("open", () => {
147147
// tslint:disable-next-line:no-empty
148-
while (offsetInMB++ < blockNumber && ws.write(randomValueHex())) {}
148+
while (offsetInMB++ < blockNumber && ws.write(randomValueHex())) { }
149149
if (offsetInMB >= blockNumber) {
150150
ws.end();
151151
}
152152
});
153153

154154
ws.on("drain", () => {
155155
// tslint:disable-next-line:no-empty
156-
while (offsetInMB++ < blockNumber && ws.write(randomValueHex())) {}
156+
while (offsetInMB++ < blockNumber && ws.write(randomValueHex())) { }
157157
if (offsetInMB >= blockNumber) {
158158
ws.end();
159159
}
@@ -168,9 +168,16 @@ export async function readStreamToLocalFile(
168168
file: string
169169
) {
170170
return new Promise<void>((resolve, reject) => {
171-
const ws = createWriteStream(file);
172-
rs.pipe(ws);
171+
const ws = createWriteStream(file, { autoClose: true });
172+
rs.on("data", (data: Buffer) => {
173+
ws.write(data, (err) => {
174+
if (err) { reject(err); }
175+
})
176+
})
173177
rs.on("error", reject);
178+
rs.on("close", () => {
179+
ws.end();
180+
});
174181
ws.on("error", reject);
175182
ws.on("finish", resolve);
176183
});

0 commit comments

Comments
 (0)