Skip to content

Commit

Permalink
Changes made
Browse files Browse the repository at this point in the history
  • Loading branch information
nkalupahana committed Mar 10, 2025
1 parent 86eb676 commit 0c118a3
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 118 deletions.
53 changes: 53 additions & 0 deletions scheduled-services/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scheduled-services/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"fluent-ffmpeg": "^2.1.3",
"lodash": "^4.17.21",
"luxon": "^3.2.1",
"p-queue": "^8.1.0",
"typescript": "^4.9.3",
"underscore": "^1.13.6",
"uuid": "^10.0.0"
Expand Down
251 changes: 139 additions & 112 deletions scheduled-services/src/bi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,62 @@ import { DateTime } from "luxon";
import { BigQuery } from "@google-cloud/bigquery";
import { Storage } from "@google-cloud/storage";
import { getFirestore } from "firebase-admin/firestore";
import { DateTimeMap, NumberMap } from "./helpers.js";
import { AnyMap, DateTimeMap, NumberMap } from "./helpers.js";
import { Database, getDatabase } from "firebase-admin/database";
import PQueue from "p-queue";

const getInfoForUID = async (db: Database, dbdata: AnyMap, uid: string) => {
try {
const [infoSnap, lastUpdatedSnap] = await Promise.all([
db.ref(`${uid}/info`).get(),
db.ref(`${uid}/lastUpdated`).get(),
]);

dbdata[uid] = {
info: infoSnap.val(),
lastUpdated: lastUpdatedSnap.val(),
}
return;
} catch (error) {
console.log(uid);
throw error;
}
}

const getSimplifiedDB = async () => {
// Get Google auth token
const response = await fetch("http://metadata/computeMetadata/v1/instance/service-accounts/default/token", {
method: "GET",
headers: {
"Metadata-Flavor": "Google"
}
})
const accessToken = (await response.json()).access_token;

// Get all UIDs in database
const allUsers = await fetch(`https://getbaselineapp-default-rtdb.firebaseio.com/.json?shallow=true&access_token=${accessToken}`)
const uids = Object.keys(await allUsers.json());
const database = getDatabase();
const db = {};
const queue = new PQueue({ concurrency: 20 });
await Promise.all(
uids.map((uid) => queue.add(() => getInfoForUID(database, db, uid)))
);

return db;
}

export const loadBasicBIData = async () => {
let db: AnyMap = {};
const simplifiedBI = (new Date()).getHours() !== 0;
console.log("Simplified BI:", simplifiedBI);

if (simplifiedBI) {
db = await getSimplifiedDB();
} else {
db = (await getDatabase().ref("/").get()).val();
}

export const loadBasicBIData = async (db: any) => {
const bigquery = new BigQuery();
const storage = new Storage();

Expand Down Expand Up @@ -36,11 +89,6 @@ export const loadBasicBIData = async (db: any) => {
].join(","));
}

let granite: string[] = [];
const addGranite = (userId: string, graniteUserId: string, timestamps: string) => {
granite.push([userId, graniteUserId, timestamps].join(","));
}

let gapFund: string[] = [];

let convData: string[] = [];
Expand Down Expand Up @@ -101,7 +149,10 @@ export const loadBasicBIData = async (db: any) => {
daysUsed = Math.round(DateTime.fromMillis(lastUpdated).diff(creationTime, "days").days);
}
addConversion(creationTime?.toMillis() ?? 0, "signed_up", db[userId].info?.utm_source, db[userId].info?.utm_campaign, userId, lastUpdated, daysUsed);


// This is all the data we have for simplified BI
if (simplifiedBI) continue;

for (let timestamp in db[userId]["logs"] ?? {}) {
addAction(userId, timestamp, "moodLog");

Expand All @@ -125,91 +176,36 @@ export const loadBasicBIData = async (db: any) => {
if ("gapFund" in db[userId]) {
gapFund.push(userId);
}

if ("partners" in db[userId]) {
if ("granite" in db[userId]["partners"]) {
let [_, payload, __] = db[userId]["partners"]["granite"]["id_token"].split(".")
payload = JSON.parse(Buffer.from(payload, "base64").toString("utf-8"));
addGranite(
userId,
payload.sub,
Object.keys(db[userId]["logs"] ?? {}).join(";")
);
}
}
}

// Get Firestore conversion data
const firestore = getFirestore();
const convSnapshot = await firestore.collection("conversions").get();
const conversions = convSnapshot.docs.map(doc => doc.data());
for (const convs of conversions) {
for (const conv of Object.values(convs)) {
if (conv.uid) continue; // Added in user step from RTDB data
addConversion(conv.timestamp.toMillis(), conv.state, conv.utm_source, conv.utm_campaign, conv.uid, undefined, undefined);
if (!simplifiedBI) {
// Get Firestore conversion data
const firestore = getFirestore();
const convSnapshot = await firestore.collection("conversions").get();
const conversions = convSnapshot.docs.map(doc => doc.data());
for (const convs of conversions) {
for (const conv of Object.values(convs)) {
if (conv.uid) continue; // Added in user step from RTDB data
addConversion(conv.timestamp.toMillis(), conv.state, conv.utm_source, conv.utm_campaign, conv.uid, undefined, undefined);
}
}
}

// Send CSVs/data to storage
let promises = [];
promises.push(storage.bucket("baseline-bi").file("actions.csv").save(actions.join("\n")));
promises.push(storage.bucket("baseline-bi").file("lengths.csv").save(logLengths.join("\n")));
promises.push(storage.bucket("baseline-bi").file("gapfund.csv").save(gapFund.join("\n")));
promises.push(storage.bucket("baseline-bi").file("users.csv").save(users.join("\n")));
promises.push(storage.bucket("baseline-bi").file("accounts.csv").save(accounts.join("\n")));
promises.push(storage.bucket("baseline-bi").file("conversions.csv").save(convData.join("\n")));
promises.push(storage.bucket("baseline-bi").file("granite.csv").save(granite.join("\n")));
if (!simplifiedBI) {
promises.push(storage.bucket("baseline-bi").file("actions.csv").save(actions.join("\n")));
promises.push(storage.bucket("baseline-bi").file("lengths.csv").save(logLengths.join("\n")));
promises.push(storage.bucket("baseline-bi").file("gapfund.csv").save(gapFund.join("\n")));
promises.push(storage.bucket("baseline-bi").file("conversions.csv").save(convData.join("\n")));
}

await Promise.all(promises);


promises = [];
// Send actions CSV to BigQuery
const actionsMetadata = {
sourceFormat: "CSV",
schema: {
fields: [
{ name: "timestamp", type: "INTEGER" },
{ name: "userId", type: "STRING" },
{ name: "date", type: "DATE" },
{ name: "time", type: "TIME" },
{ name: "action", type: "STRING" }
],
},
location: "US",
writeDisposition: "WRITE_TRUNCATE"
};
promises.push(bigquery.dataset("bi").table("actions").load(storage.bucket("baseline-bi").file("actions.csv"), actionsMetadata));

// Send lengths CSV to BigQuery
const lengthsMetadata = {
sourceFormat: "CSV",
schema: {
fields: [
{ name: "timestamp", type: "INTEGER" },
{ name: "userId", type: "STRING" },
{ name: "len", type: "INTEGER" },
{ name: "bucket", type: "STRING" }
],
},
location: "US",
writeDisposition: "WRITE_TRUNCATE"
};
promises.push(bigquery.dataset("bi").table("log_length").load(storage.bucket("baseline-bi").file("lengths.csv"), lengthsMetadata));

// Send gap fund CSV to BigQuery
const gapMetadata = {
sourceFormat: "CSV",
schema: {
fields: [
{ name: "userId", type: "STRING" }
]
},
location: "US",
writeDisposition: "WRITE_TRUNCATE"
};
promises.push(bigquery.dataset("bi").table("gap_fund").load(storage.bucket("baseline-bi").file("gapfund.csv"), gapMetadata));

// Send users CSV to BigQuery
const usersMetadata = {
sourceFormat: "CSV",
Expand Down Expand Up @@ -243,41 +239,72 @@ export const loadBasicBIData = async (db: any) => {
};
promises.push(bigquery.dataset("bi").table("accounts").load(storage.bucket("baseline-bi").file("accounts.csv"), accountsMetadata));

// Send conversions CSV to BigQuery
const convMetadata = {
sourceFormat: "CSV",
schema: {
fields: [
{ name: "timestamp", type: "INTEGER" },
{ name: "state", type: "STRING" },
{ name: "utm_source", type: "STRING" },
{ name: "utm_campaign", type: "STRING" },
{ name: "userId", type: "STRING" },
{ name: "lastUpdated", type: "INTEGER" },
{ name: "daysUsed", type: "INTEGER" }
]
},
location: "US",
writeDisposition: "WRITE_TRUNCATE"
};
promises.push(bigquery.dataset("bi").table("conversions").load(storage.bucket("baseline-bi").file("conversions.csv"), convMetadata));
if (!simplifiedBI) {
// Send actions CSV to BigQuery
const actionsMetadata = {
sourceFormat: "CSV",
schema: {
fields: [
{ name: "timestamp", type: "INTEGER" },
{ name: "userId", type: "STRING" },
{ name: "date", type: "DATE" },
{ name: "time", type: "TIME" },
{ name: "action", type: "STRING" }
],
},
location: "US",
writeDisposition: "WRITE_TRUNCATE"
};
promises.push(bigquery.dataset("bi").table("actions").load(storage.bucket("baseline-bi").file("actions.csv"), actionsMetadata));

await Promise.all(promises);
// Send lengths CSV to BigQuery
const lengthsMetadata = {
sourceFormat: "CSV",
schema: {
fields: [
{ name: "timestamp", type: "INTEGER" },
{ name: "userId", type: "STRING" },
{ name: "len", type: "INTEGER" },
{ name: "bucket", type: "STRING" }
],
},
location: "US",
writeDisposition: "WRITE_TRUNCATE"
};
promises.push(bigquery.dataset("bi").table("log_length").load(storage.bucket("baseline-bi").file("lengths.csv"), lengthsMetadata));

// Send granite CSV to BigQuery
const graniteMetadata = {
sourceFormat: "CSV",
schema: {
fields: [
{ name: "baselineUserId", type: "STRING" },
{ name: "graniteUserId", type: "STRING" },
{ name: "timestamps", type: "STRING" }
]
},
location: "US",
writeDisposition: "WRITE_TRUNCATE"
};
promises.push(bigquery.dataset("bi_granite").table("granite_main").load(storage.bucket("baseline-bi").file("granite.csv"), graniteMetadata));
// Send gap fund CSV to BigQuery
const gapMetadata = {
sourceFormat: "CSV",
schema: {
fields: [
{ name: "userId", type: "STRING" }
]
},
location: "US",
writeDisposition: "WRITE_TRUNCATE"
};
promises.push(bigquery.dataset("bi").table("gap_fund").load(storage.bucket("baseline-bi").file("gapfund.csv"), gapMetadata));

// Send conversions CSV to BigQuery
const convMetadata = {
sourceFormat: "CSV",
schema: {
fields: [
{ name: "timestamp", type: "INTEGER" },
{ name: "state", type: "STRING" },
{ name: "utm_source", type: "STRING" },
{ name: "utm_campaign", type: "STRING" },
{ name: "userId", type: "STRING" },
{ name: "lastUpdated", type: "INTEGER" },
{ name: "daysUsed", type: "INTEGER" }
]
},
location: "US",
writeDisposition: "WRITE_TRUNCATE"
};
promises.push(bigquery.dataset("bi").table("conversions").load(storage.bucket("baseline-bi").file("conversions.csv"), convMetadata));
}

await Promise.all(promises);
}
Loading

0 comments on commit 0c118a3

Please sign in to comment.