-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathldes-pipeline.ts
79 lines (73 loc) · 2.31 KB
/
ldes-pipeline.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import { pipeline } from "stream/promises";
import { newEngine, LDESClient, State } from "@treecg/actor-init-ldes-client";
import * as RDF from "rdf-js";
import { NamedNode } from "rdf-js";
import { extractEndpointHeadersFromEnv } from "./rdf-utils";
import { LDES_ENDPOINT_HEADER_PREFIX, PERSIST_STATE } from "./config";
import { fetchState, updateState } from "./sparql-queries";
import MemberProcessor from "./member-processor";
export interface ConfigurableLDESOptions {
pollingInterval?: number;
mimeType?: string;
dereferenceMembers?: boolean;
requestsPerMinute?: number;
}
interface LDESOptions {
representation: string
mimeType: string
requestHeaders: { [key: string]: number | string | string[] }
emitMemberOnce: boolean,
disableSynchronization: boolean,
pollingInterval?: number;
dereferenceMembers?: boolean;
requestsPerMinute?: number;
}
export type ConsumerArgs = {
datasetIri: NamedNode
endpoint: string
ldesOptions?: ConfigurableLDESOptions;
};
export type Member = {
id: RDF.Term;
quads: RDF.Quad[];
};
export default class LdesPipeline {
private client: LDESClient;
private endpoint: string;
private datasetIri: NamedNode;
private ldesOptions : LDESOptions;
constructor ({ endpoint, datasetIri, ldesOptions }: ConsumerArgs) {
this.endpoint = endpoint;
this.client = newEngine();
const defaultOptions = {
representation: "Quads",
mimeType: "application/ld+json",
requestHeaders: extractEndpointHeadersFromEnv(LDES_ENDPOINT_HEADER_PREFIX),
emitMemberOnce: true,
disableSynchronization: true
};
this.ldesOptions = { ...defaultOptions, ...ldesOptions };
this.datasetIri = datasetIri;
}
async consumeStream () {
const lastState = PERSIST_STATE ? await fetchState(this.datasetIri) : undefined;
try {
const ldesStream = this.client.createReadStream(
this.endpoint,
// @ts-ignore
this.ldesOptions,
lastState as State | undefined
);
const memberProcessor = new MemberProcessor();
await pipeline(
ldesStream,
memberProcessor
);
if (PERSIST_STATE) { await updateState(this.datasetIri, ldesStream.exportState()); }
console.log("finished processing stream");
} catch (e) {
console.log("processing stream failed");
console.error(e);
}
}
}