Skip to content

Commit 0a3c698

Browse files
committed
Fix issue of 'too many handles' error when downloading a large file
1 parent 3f64420 commit 0a3c698

File tree

2 files changed

+83
-18
lines changed

2 files changed

+83
-18
lines changed

src/common/persistence/FSExtentStore.ts

+11-18
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import {
22
close,
3-
createReadStream,
43
createWriteStream,
54
fdatasync,
65
mkdir,
@@ -30,6 +29,7 @@ import IExtentStore, {
3029
} from "./IExtentStore";
3130
import IOperationQueue from "./IOperationQueue";
3231
import OperationQueue from "./OperationQueue";
32+
import FileLazyReadStream from "./FileLazyReadStream";
3333

3434
const statAsync = promisify(stat);
3535
const mkdirAsync = promisify(mkdir);
@@ -333,26 +333,19 @@ export default class FSExtentStore implements IExtentStore {
333333
const op = () =>
334334
new Promise<NodeJS.ReadableStream>((resolve, reject) => {
335335
this.logger.verbose(
336-
`FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${
337-
extentChunk.id
338-
} path:${path} offset:${extentChunk.offset} count:${
339-
extentChunk.count
336+
`FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${extentChunk.id
337+
} path:${path} offset:${extentChunk.offset} count:${extentChunk.count
340338
} end:${extentChunk.offset + extentChunk.count - 1}`,
341339
contextId
342340
);
343-
const stream = createReadStream(path, {
344-
start: extentChunk.offset,
345-
end: extentChunk.offset + extentChunk.count - 1
346-
}).on("close", () => {
347-
this.logger.verbose(
348-
`FSExtentStore:readExtent() Read stream closed. LocationId:${persistencyId} extentId:${
349-
extentChunk.id
350-
} path:${path} offset:${extentChunk.offset} count:${
351-
extentChunk.count
352-
} end:${extentChunk.offset + extentChunk.count - 1}`,
353-
contextId
354-
);
355-
});
341+
const stream = new FileLazyReadStream(
342+
path,
343+
extentChunk.offset,
344+
extentChunk.offset + extentChunk.count - 1,
345+
this.logger,
346+
persistencyId,
347+
extentChunk.id,
348+
contextId);
356349
resolve(stream);
357350
});
358351

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { ReadStream, createReadStream } from "fs";
2+
import { Readable } from "stream";
3+
import ILogger from "../ILogger";
4+
5+
6+
export default class FileLazyReadStream extends Readable {
7+
private extentStream: ReadStream | undefined;
8+
constructor(
9+
private readonly extentPath: string,
10+
private readonly start: number,
11+
private readonly end: number,
12+
private readonly logger: ILogger,
13+
private readonly persistencyId: string,
14+
private readonly extentId: string,
15+
private readonly contextId?: string) {
16+
super();
17+
}
18+
19+
public _read(): void {
20+
if (this.extentStream === undefined) {
21+
this.extentStream = createReadStream(this.extentPath, {
22+
start: this.start,
23+
end: this.end
24+
}).on("close", () => {
25+
this.logger.verbose(
26+
`FSExtentStore:readExtent() Read stream closed. LocationId:${this.persistencyId} extentId:${this.extentId
27+
} path:${this.extentPath} offset:${this.start} end:${this.end}`,
28+
this.contextId
29+
);
30+
});
31+
this.setSourceEventHandlers();
32+
}
33+
this.extentStream?.resume();
34+
}
35+
36+
private setSourceEventHandlers() {
37+
this.extentStream?.on("data", this.sourceDataHandler);
38+
this.extentStream?.on("end", this.sourceErrorOrEndHandler);
39+
this.extentStream?.on("error", this.sourceErrorOrEndHandler);
40+
}
41+
42+
private removeSourceEventHandlers() {
43+
this.extentStream?.removeListener("data", this.sourceDataHandler);
44+
this.extentStream?.removeListener("end", this.sourceErrorOrEndHandler);
45+
this.extentStream?.removeListener("error", this.sourceErrorOrEndHandler);
46+
}
47+
48+
private sourceDataHandler = (data: Buffer) => {
49+
if (!this.push(data)) {
50+
this.extentStream?.pause();
51+
}
52+
}
53+
54+
private sourceErrorOrEndHandler = (err?: Error) => {
55+
if (err && err.name === "AbortError") {
56+
this.destroy(err);
57+
return;
58+
}
59+
60+
this.removeSourceEventHandlers();
61+
this.push(null);
62+
this.destroy(err);
63+
}
64+
65+
_destroy(error: Error | null, callback: (error?: Error) => void): void {
66+
// remove listener from source and release source
67+
//this.removeSourceEventHandlers();
68+
(this.extentStream as Readable).destroy();
69+
70+
callback(error === null ? undefined : error);
71+
}
72+
}

0 commit comments

Comments
 (0)