diff --git a/scheduled-services/package-lock.json b/scheduled-services/package-lock.json index 556e5183..3ab8807a 100644 --- a/scheduled-services/package-lock.json +++ b/scheduled-services/package-lock.json @@ -18,6 +18,7 @@ "fluent-ffmpeg": "^2.1.3", "lodash": "^4.17.21", "luxon": "^3.2.1", + "p-queue": "^8.1.0", "typescript": "^4.9.3", "underscore": "^1.13.6", "uuid": "^10.0.0" @@ -1219,6 +1220,11 @@ "node": ">=6" } }, + "node_modules/eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" + }, "node_modules/events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", @@ -2546,6 +2552,32 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.1.0.tgz", + "integrity": "sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==", + "dependencies": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/p-queue/node_modules/p-timeout": { + "version": "6.1.4", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.4.tgz", + "integrity": "sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg==", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-timeout": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz", @@ -4253,6 +4285,11 @@ "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz", "integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==" }, + "eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" + }, "events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", @@ -5264,6 +5301,22 @@ "yocto-queue": "^0.1.0" } }, + "p-queue": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.1.0.tgz", + "integrity": "sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==", + "requires": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + }, + "dependencies": { + "p-timeout": { + "version": "6.1.4", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.4.tgz", + "integrity": "sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg==" + } + } + }, "p-timeout": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz", diff --git a/scheduled-services/package.json b/scheduled-services/package.json index a035ccd7..6bdfeb4d 100644 --- a/scheduled-services/package.json +++ b/scheduled-services/package.json @@ -25,6 +25,7 @@ "fluent-ffmpeg": "^2.1.3", "lodash": "^4.17.21", "luxon": "^3.2.1", + "p-queue": "^8.1.0", "typescript": "^4.9.3", "underscore": "^1.13.6", "uuid": "^10.0.0" diff --git a/scheduled-services/src/bi.ts b/scheduled-services/src/bi.ts index 252cecea..3ba2b07e 100644 --- a/scheduled-services/src/bi.ts +++ b/scheduled-services/src/bi.ts @@ -3,9 +3,62 @@ import { DateTime } from "luxon"; import { BigQuery } from "@google-cloud/bigquery"; import { Storage } from "@google-cloud/storage"; import { getFirestore } from "firebase-admin/firestore"; -import { DateTimeMap, NumberMap } from "./helpers.js"; +import { AnyMap, DateTimeMap, NumberMap } from "./helpers.js"; +import { Database, getDatabase } from "firebase-admin/database"; +import PQueue from "p-queue"; + +const getInfoForUID = async (db: Database, dbdata: AnyMap, uid: string) => { + try { + const [infoSnap, lastUpdatedSnap] = await Promise.all([ + db.ref(`${uid}/info`).get(), + db.ref(`${uid}/lastUpdated`).get(), + ]); + + dbdata[uid] = { + info: infoSnap.val(), + lastUpdated: lastUpdatedSnap.val(), + } + return; + } catch (error) { + console.log(uid); + throw error; + } +} + +const getSimplifiedDB = async () => { + // Get Google auth token + const response = await fetch("http://metadata/computeMetadata/v1/instance/service-accounts/default/token", { + method: "GET", + headers: { + "Metadata-Flavor": "Google" + } + }) + const accessToken = (await response.json()).access_token; + + // Get all UIDs in database + const allUsers = await fetch(`https://getbaselineapp-default-rtdb.firebaseio.com/.json?shallow=true&access_token=${accessToken}`) + const uids = Object.keys(await allUsers.json()); + const database = getDatabase(); + const db = {}; + const queue = new PQueue({ concurrency: 20 }); + await Promise.all( + uids.map((uid) => queue.add(() => getInfoForUID(database, db, uid))) + ); + + return db; +} + +export const loadBasicBIData = async () => { + let db: AnyMap = {}; + const simplifiedBI = (new Date()).getHours() !== 0; + console.log("Simplified BI:", simplifiedBI); + + if (simplifiedBI) { + db = await getSimplifiedDB(); + } else { + db = (await getDatabase().ref("/").get()).val(); + } -export const loadBasicBIData = async (db: any) => { const bigquery = new BigQuery(); const storage = new Storage(); @@ -36,11 +89,6 @@ export const loadBasicBIData = async (db: any) => { ].join(",")); } - let granite: string[] = []; - const addGranite = (userId: string, graniteUserId: string, timestamps: string) => { - granite.push([userId, graniteUserId, timestamps].join(",")); - } - let gapFund: string[] = []; let convData: string[] = []; @@ -101,7 +149,10 @@ export const loadBasicBIData = async (db: any) => { daysUsed = Math.round(DateTime.fromMillis(lastUpdated).diff(creationTime, "days").days); } addConversion(creationTime?.toMillis() ?? 0, "signed_up", db[userId].info?.utm_source, db[userId].info?.utm_campaign, userId, lastUpdated, daysUsed); - + + // This is all the data we have for simplified BI + if (simplifiedBI) continue; + for (let timestamp in db[userId]["logs"] ?? {}) { addAction(userId, timestamp, "moodLog"); @@ -125,91 +176,36 @@ export const loadBasicBIData = async (db: any) => { if ("gapFund" in db[userId]) { gapFund.push(userId); } - - if ("partners" in db[userId]) { - if ("granite" in db[userId]["partners"]) { - let [_, payload, __] = db[userId]["partners"]["granite"]["id_token"].split(".") - payload = JSON.parse(Buffer.from(payload, "base64").toString("utf-8")); - addGranite( - userId, - payload.sub, - Object.keys(db[userId]["logs"] ?? {}).join(";") - ); - } - } } - // Get Firestore conversion data - const firestore = getFirestore(); - const convSnapshot = await firestore.collection("conversions").get(); - const conversions = convSnapshot.docs.map(doc => doc.data()); - for (const convs of conversions) { - for (const conv of Object.values(convs)) { - if (conv.uid) continue; // Added in user step from RTDB data - addConversion(conv.timestamp.toMillis(), conv.state, conv.utm_source, conv.utm_campaign, conv.uid, undefined, undefined); + if (!simplifiedBI) { + // Get Firestore conversion data + const firestore = getFirestore(); + const convSnapshot = await firestore.collection("conversions").get(); + const conversions = convSnapshot.docs.map(doc => doc.data()); + for (const convs of conversions) { + for (const conv of Object.values(convs)) { + if (conv.uid) continue; // Added in user step from RTDB data + addConversion(conv.timestamp.toMillis(), conv.state, conv.utm_source, conv.utm_campaign, conv.uid, undefined, undefined); + } } } // Send CSVs/data to storage let promises = []; - promises.push(storage.bucket("baseline-bi").file("actions.csv").save(actions.join("\n"))); - promises.push(storage.bucket("baseline-bi").file("lengths.csv").save(logLengths.join("\n"))); - promises.push(storage.bucket("baseline-bi").file("gapfund.csv").save(gapFund.join("\n"))); promises.push(storage.bucket("baseline-bi").file("users.csv").save(users.join("\n"))); promises.push(storage.bucket("baseline-bi").file("accounts.csv").save(accounts.join("\n"))); - promises.push(storage.bucket("baseline-bi").file("conversions.csv").save(convData.join("\n"))); - promises.push(storage.bucket("baseline-bi").file("granite.csv").save(granite.join("\n"))); + if (!simplifiedBI) { + promises.push(storage.bucket("baseline-bi").file("actions.csv").save(actions.join("\n"))); + promises.push(storage.bucket("baseline-bi").file("lengths.csv").save(logLengths.join("\n"))); + promises.push(storage.bucket("baseline-bi").file("gapfund.csv").save(gapFund.join("\n"))); + promises.push(storage.bucket("baseline-bi").file("conversions.csv").save(convData.join("\n"))); + } await Promise.all(promises); promises = []; - // Send actions CSV to BigQuery - const actionsMetadata = { - sourceFormat: "CSV", - schema: { - fields: [ - { name: "timestamp", type: "INTEGER" }, - { name: "userId", type: "STRING" }, - { name: "date", type: "DATE" }, - { name: "time", type: "TIME" }, - { name: "action", type: "STRING" } - ], - }, - location: "US", - writeDisposition: "WRITE_TRUNCATE" - }; - promises.push(bigquery.dataset("bi").table("actions").load(storage.bucket("baseline-bi").file("actions.csv"), actionsMetadata)); - - // Send lengths CSV to BigQuery - const lengthsMetadata = { - sourceFormat: "CSV", - schema: { - fields: [ - { name: "timestamp", type: "INTEGER" }, - { name: "userId", type: "STRING" }, - { name: "len", type: "INTEGER" }, - { name: "bucket", type: "STRING" } - ], - }, - location: "US", - writeDisposition: "WRITE_TRUNCATE" - }; - promises.push(bigquery.dataset("bi").table("log_length").load(storage.bucket("baseline-bi").file("lengths.csv"), lengthsMetadata)); - - // Send gap fund CSV to BigQuery - const gapMetadata = { - sourceFormat: "CSV", - schema: { - fields: [ - { name: "userId", type: "STRING" } - ] - }, - location: "US", - writeDisposition: "WRITE_TRUNCATE" - }; - promises.push(bigquery.dataset("bi").table("gap_fund").load(storage.bucket("baseline-bi").file("gapfund.csv"), gapMetadata)); - // Send users CSV to BigQuery const usersMetadata = { sourceFormat: "CSV", @@ -243,41 +239,72 @@ export const loadBasicBIData = async (db: any) => { }; promises.push(bigquery.dataset("bi").table("accounts").load(storage.bucket("baseline-bi").file("accounts.csv"), accountsMetadata)); - // Send conversions CSV to BigQuery - const convMetadata = { - sourceFormat: "CSV", - schema: { - fields: [ - { name: "timestamp", type: "INTEGER" }, - { name: "state", type: "STRING" }, - { name: "utm_source", type: "STRING" }, - { name: "utm_campaign", type: "STRING" }, - { name: "userId", type: "STRING" }, - { name: "lastUpdated", type: "INTEGER" }, - { name: "daysUsed", type: "INTEGER" } - ] - }, - location: "US", - writeDisposition: "WRITE_TRUNCATE" - }; - promises.push(bigquery.dataset("bi").table("conversions").load(storage.bucket("baseline-bi").file("conversions.csv"), convMetadata)); + if (!simplifiedBI) { + // Send actions CSV to BigQuery + const actionsMetadata = { + sourceFormat: "CSV", + schema: { + fields: [ + { name: "timestamp", type: "INTEGER" }, + { name: "userId", type: "STRING" }, + { name: "date", type: "DATE" }, + { name: "time", type: "TIME" }, + { name: "action", type: "STRING" } + ], + }, + location: "US", + writeDisposition: "WRITE_TRUNCATE" + }; + promises.push(bigquery.dataset("bi").table("actions").load(storage.bucket("baseline-bi").file("actions.csv"), actionsMetadata)); - await Promise.all(promises); + // Send lengths CSV to BigQuery + const lengthsMetadata = { + sourceFormat: "CSV", + schema: { + fields: [ + { name: "timestamp", type: "INTEGER" }, + { name: "userId", type: "STRING" }, + { name: "len", type: "INTEGER" }, + { name: "bucket", type: "STRING" } + ], + }, + location: "US", + writeDisposition: "WRITE_TRUNCATE" + }; + promises.push(bigquery.dataset("bi").table("log_length").load(storage.bucket("baseline-bi").file("lengths.csv"), lengthsMetadata)); - // Send granite CSV to BigQuery - const graniteMetadata = { - sourceFormat: "CSV", - schema: { - fields: [ - { name: "baselineUserId", type: "STRING" }, - { name: "graniteUserId", type: "STRING" }, - { name: "timestamps", type: "STRING" } - ] - }, - location: "US", - writeDisposition: "WRITE_TRUNCATE" - }; - promises.push(bigquery.dataset("bi_granite").table("granite_main").load(storage.bucket("baseline-bi").file("granite.csv"), graniteMetadata)); + // Send gap fund CSV to BigQuery + const gapMetadata = { + sourceFormat: "CSV", + schema: { + fields: [ + { name: "userId", type: "STRING" } + ] + }, + location: "US", + writeDisposition: "WRITE_TRUNCATE" + }; + promises.push(bigquery.dataset("bi").table("gap_fund").load(storage.bucket("baseline-bi").file("gapfund.csv"), gapMetadata)); + + // Send conversions CSV to BigQuery + const convMetadata = { + sourceFormat: "CSV", + schema: { + fields: [ + { name: "timestamp", type: "INTEGER" }, + { name: "state", type: "STRING" }, + { name: "utm_source", type: "STRING" }, + { name: "utm_campaign", type: "STRING" }, + { name: "userId", type: "STRING" }, + { name: "lastUpdated", type: "INTEGER" }, + { name: "daysUsed", type: "INTEGER" } + ] + }, + location: "US", + writeDisposition: "WRITE_TRUNCATE" + }; + promises.push(bigquery.dataset("bi").table("conversions").load(storage.bucket("baseline-bi").file("conversions.csv"), convMetadata)); + } await Promise.all(promises); } diff --git a/scheduled-services/src/index.ts b/scheduled-services/src/index.ts index 16c20103..c79b98d8 100644 --- a/scheduled-services/src/index.ts +++ b/scheduled-services/src/index.ts @@ -1,12 +1,11 @@ import express from "express"; import { initializeApp } from "firebase-admin/app"; -import { getDatabase } from "firebase-admin/database"; -import { loadBasicBIData } from "./bi.js"; import { cleanUpAnonymous, cleanUpQuotas } from "./cleanup.js"; import { cleanUpTokens, logReminder, removeUserNotifications, sendCleanUpMessage } from "./messaging.js"; import { AnyMap } from "./helpers.js"; import * as Sentry from "@sentry/node"; import { audioDeadLetter, processAudio } from "./audio.js"; +import { loadBasicBIData } from "./bi.js"; initializeApp({ databaseURL: "https://getbaselineapp-default-rtdb.firebaseio.com/", @@ -18,7 +17,7 @@ const app = express(); Sentry.init({ dsn: "https://6e656f8deb434639a0bc75e5093b6f3c@o4504179120472064.ingest.sentry.io/4504348672196608", integrations: [ - ...Sentry.autoDiscoverNodePerformanceMonitoringIntegrations() + ...Sentry.autoDiscoverNodePerformanceMonitoringIntegrations() ], tracesSampleRate: 1.0, }); @@ -42,14 +41,15 @@ export const makeInternalRequest = (req: express.Request, path: string, data?: A app.post("/", (_, res) => { res.send(200); }); + app.post("/bi", async (req, res) => { - const db = (await getDatabase().ref("/").get()).val(); - await loadBasicBIData(db); + await loadBasicBIData(); // Now that data's been loaded, // kick off requests based on that makeInternalRequest(req, "messaging/logReminder"); + res.send(200); }); diff --git a/scheduled-services/src/messaging.ts b/scheduled-services/src/messaging.ts index a999f224..7eb3941a 100644 --- a/scheduled-services/src/messaging.ts +++ b/scheduled-services/src/messaging.ts @@ -164,7 +164,6 @@ export const logReminder = async (req: Request, res: Response) => { const chunkedAssociations = _.chunk(userMessageAssociation, 100); for (let i = 0; i < chunkedMessages.length; ++i) { const messagingResult = await getMessaging().sendEach(chunkedMessages[i]); - console.log(JSON.stringify(messagingResult)); makeInternalRequest(req, "messaging/cleanUpTokens", { userMessageAssociation: chunkedAssociations[i], messagingResult: messagingResult.responses