Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HOTFIX] Fix topic modelling issue where it fails sometimes #499

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 49 additions & 34 deletions admin_app/src/app/dashboard/components/Insights.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ interface InsightProps {
customDateParams?: CustomDateParams;
}

const POLLING_INTERVAL = 3000;
const POLLING_TIMEOUT = 90000;
const POLLING_INTERVAL = 3 * 1000;
const POLLING_TIMEOUT = 3 * 60 * 1000;

const Insight: React.FC<InsightProps> = ({ timePeriod, customDateParams }) => {
const { token } = useAuth();
Expand Down Expand Up @@ -51,7 +51,10 @@ const Insight: React.FC<InsightProps> = ({ timePeriod, customDateParams }) => {
const runRefresh = (period: Period) => {
const periodKey = period;
setRefreshingByTimePeriod((prev) => ({ ...prev, [periodKey]: true }));
setDataStatusByTimePeriod((prev) => ({ ...prev, [periodKey]: "in_progress" }));
setDataStatusByTimePeriod((prev) => ({
...prev,
[periodKey]: "in_progress",
}));

if (
period === "custom" &&
Expand All @@ -69,8 +72,14 @@ const Insight: React.FC<InsightProps> = ({ timePeriod, customDateParams }) => {
pollData(period);
})
.catch((error) => {
setRefreshingByTimePeriod((prev) => ({ ...prev, [periodKey]: false }));
setDataStatusByTimePeriod((prev) => ({ ...prev, [periodKey]: "error" }));
setRefreshingByTimePeriod((prev) => ({
...prev,
[periodKey]: false,
}));
setDataStatusByTimePeriod((prev) => ({
...prev,
[periodKey]: "error",
}));
setSnackMessage({
message: error.message || "There was a system error.",
color: "error",
Expand All @@ -83,8 +92,14 @@ const Insight: React.FC<InsightProps> = ({ timePeriod, customDateParams }) => {
pollData(period);
})
.catch((error) => {
setRefreshingByTimePeriod((prev) => ({ ...prev, [periodKey]: false }));
setDataStatusByTimePeriod((prev) => ({ ...prev, [periodKey]: "error" }));
setRefreshingByTimePeriod((prev) => ({
...prev,
[periodKey]: false,
}));
setDataStatusByTimePeriod((prev) => ({
...prev,
[periodKey]: "error",
}));
setSnackMessage({
message: error.message || "There was a system error.",
color: "error",
Expand All @@ -102,8 +117,14 @@ const Insight: React.FC<InsightProps> = ({ timePeriod, customDateParams }) => {
try {
const elapsedTime = Date.now() - startTime;
if (elapsedTime >= POLLING_TIMEOUT) {
setRefreshingByTimePeriod((prev) => ({ ...prev, [periodKey]: false }));
setDataStatusByTimePeriod((prev) => ({ ...prev, [periodKey]: "error" }));
setRefreshingByTimePeriod((prev) => ({
...prev,
[periodKey]: false,
}));
setDataStatusByTimePeriod((prev) => ({
...prev,
[periodKey]: "error",
}));
clearInterval(pollingTimerRef.current[periodKey]!);
pollingTimerRef.current[periodKey] = null;
setSnackMessage({
Expand Down Expand Up @@ -140,7 +161,10 @@ const Insight: React.FC<InsightProps> = ({ timePeriod, customDateParams }) => {
...prev,
[periodKey]: dataFromBackendResponse,
}));
setRefreshingByTimePeriod((prev) => ({ ...prev, [periodKey]: false }));
setRefreshingByTimePeriod((prev) => ({
...prev,
[periodKey]: false,
}));
clearInterval(pollingTimerRef.current[periodKey]!);
pollingTimerRef.current[periodKey] = null;
setSnackMessage({
Expand All @@ -155,7 +179,10 @@ const Insight: React.FC<InsightProps> = ({ timePeriod, customDateParams }) => {
...prev,
[periodKey]: dataFromBackendResponse,
}));
setRefreshingByTimePeriod((prev) => ({ ...prev, [periodKey]: false }));
setRefreshingByTimePeriod((prev) => ({
...prev,
[periodKey]: false,
}));
clearInterval(pollingTimerRef.current[periodKey]!);
pollingTimerRef.current[periodKey] = null;
setSnackMessage({
Expand Down Expand Up @@ -270,32 +297,18 @@ const Insight: React.FC<InsightProps> = ({ timePeriod, customDateParams }) => {
setAiSummary("Not available.");
}
};

useEffect(() => {
const currentData = dataByTimePeriod[timePeriod] || {
status: "not_started",
refreshTimeStamp: "",
data: [],
unclustered_queries: [],
error_message: "",
failure_step: "",
};
if (selectedTopicId !== null) {
const selectedTopic = currentData.data.find(
(topic) => topic.topic_id === selectedTopicId,
);
if (selectedTopic) {
setTopicQueries(selectedTopic.topic_samples);
setAiSummary(selectedTopic.topic_summary);
} else {
setTopicQueries([]);
setAiSummary("Not available.");
const periodKey = timePeriod;
if (dataByTimePeriod[periodKey]) {
updateUIForCurrentTimePeriod(dataByTimePeriod[periodKey]);
if (
dataStatusByTimePeriod[periodKey] === "in_progress" &&
!pollingTimerRef.current[periodKey]
) {
pollData(timePeriod);
}
} else {
setTopicQueries([]);
setAiSummary("Not available.");
}
}, [dataByTimePeriod, selectedTopicId, timePeriod]);
}, [timePeriod, dataByTimePeriod, dataStatusByTimePeriod]);

const currentData = dataByTimePeriod[timePeriod] || {
status: "not_started",
Expand Down Expand Up @@ -351,7 +364,9 @@ const Insight: React.FC<InsightProps> = ({ timePeriod, customDateParams }) => {
/>
</Box>
</Paper>

<BokehPlot timePeriod={timePeriod} token={token} />

<Snackbar
open={snackMessage.message !== null}
autoHideDuration={5000}
Expand Down
3 changes: 1 addition & 2 deletions admin_app/src/app/dashboard/components/insights/Bokeh.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const BokehPlot: React.FC<BokehPlotProps> = ({ timePeriod, token }) => {
if (token) {
try {
const data = await getEmbeddingData(timePeriod, token);

const { embed } = await import("@bokeh/bokehjs");

if (plotRef.current) {
Expand All @@ -56,7 +55,7 @@ const BokehPlot: React.FC<BokehPlotProps> = ({ timePeriod, token }) => {
} catch (error: any) {
console.error("Error fetching the plot:", error);
setLoadError(`There was a system failure. Ensure you have both content and queries present
and clicked the "Generate Insights" button.`);
and clicked the "Run Discovery" button.`);
} finally {
setLoading(false);
}
Expand Down
103 changes: 53 additions & 50 deletions core_backend/app/dashboard/routers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""This module contains FastAPI routers for dashboard endpoints."""

import json
import os
from datetime import date, datetime, timedelta, timezone
from typing import Annotated, Literal, Optional

Expand All @@ -18,7 +19,7 @@
from sqlalchemy.ext.asyncio import AsyncSession

from ..auth.dependencies import get_current_workspace_name
from ..database import get_async_session
from ..database import get_async_session, get_sqlalchemy_async_engine
from ..users.models import WorkspaceDB
from ..utils import setup_logger
from ..workspaces.utils import get_workspace_by_workspace_name
Expand Down Expand Up @@ -48,6 +49,8 @@
)
from .topic_modeling import topic_model_queries

os.environ["TOKENIZERS_PARALLELISM"] = "false"

TAG_METADATA = {
"name": "Dashboard",
"description": "_Requires user login._ Dashboard data fetching operations.",
Expand Down Expand Up @@ -328,7 +331,6 @@ async def refresh_insights_frequency(

background_tasks.add_task(
refresh_insights,
asession=asession,
end_date=end_dt,
request=request,
start_date=start_dt,
Expand Down Expand Up @@ -512,7 +514,6 @@ def get_freq_start_end_date(

async def refresh_insights(
*,
asession: AsyncSession = Depends(get_async_session),
end_date: date,
request: Request,
start_date: date,
Expand All @@ -538,61 +539,63 @@ async def refresh_insights(
workspace_db
The workspace database object.
"""

redis = request.app.state.redis
await redis.set(
f"{workspace_db.workspace_name}_insights_{timeframe}_results",
TopicsData(
data=[],
refreshTimeStamp=datetime.now(timezone.utc).isoformat(),
status="in_progress",
).model_dump_json(),
)

step = None
try:
step = "Retrieve queries"
time_period_queries = await get_raw_queries(
asession=asession,
end_date=end_date,
start_date=start_date,
workspace_id=workspace_db.workspace_id,
)

step = "Retrieve contents"
content_data = await get_raw_contents(
asession=asession, workspace_id=workspace_db.workspace_id
)

topic_output, embeddings_df = await topic_model_queries(
content_data=content_data,
query_data=time_period_queries,
workspace_id=workspace_db.workspace_id,
)

step = "Write to Redis"
embeddings_json = embeddings_df.to_json(orient="split")
embeddings_key = f"{workspace_db.workspace_name}_embeddings_{timeframe}"
await redis.set(embeddings_key, embeddings_json)
await redis.set(
f"{workspace_db.workspace_name}_insights_{timeframe}_results",
topic_output.model_dump_json(),
)
return
except Exception as e: # pylint: disable=W0718
error_msg = str(e)
logger.error(error_msg)
async with AsyncSession(
get_sqlalchemy_async_engine(), expire_on_commit=False
) as asession:
redis = request.app.state.redis
await redis.set(
f"{workspace_db.workspace_name}_insights_{timeframe}_results",
TopicsData(
data=[],
error_message=error_msg,
failure_step=step,
refreshTimeStamp=datetime.now(timezone.utc).isoformat(),
status="error",
status="in_progress",
).model_dump_json(),
)

step = None
try:
step = "Retrieve queries"
time_period_queries = await get_raw_queries(
asession=asession,
end_date=end_date,
start_date=start_date,
workspace_id=workspace_db.workspace_id,
)

step = "Retrieve contents"
content_data = await get_raw_contents(
asession=asession, workspace_id=workspace_db.workspace_id
)

topic_output, embeddings_df = await topic_model_queries(
content_data=content_data,
query_data=time_period_queries,
workspace_id=workspace_db.workspace_id,
)

step = "Write to Redis"
embeddings_json = embeddings_df.to_json(orient="split")
embeddings_key = f"{workspace_db.workspace_name}_embeddings_{timeframe}"
await redis.set(embeddings_key, embeddings_json)
await redis.set(
f"{workspace_db.workspace_name}_insights_{timeframe}_results",
topic_output.model_dump_json(),
)
return
except Exception as e: # pylint: disable=W0718
error_msg = str(e)
logger.error(error_msg)
await redis.set(
f"{workspace_db.workspace_name}_insights_{timeframe}_results",
TopicsData(
data=[],
error_message=error_msg,
failure_step=step,
refreshTimeStamp=datetime.now(timezone.utc).isoformat(),
status="error",
).model_dump_json(),
)


async def retrieve_overview(
*,
Expand Down
2 changes: 1 addition & 1 deletion core_backend/startup.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
python -m alembic upgrade head
exec gunicorn -k main.Worker -w 4 -b 0.0.0.0:8000 --preload \
-c gunicorn_hooks_config.py main:app -t 100
-c gunicorn_hooks_config.py main:app -t 200
#
Loading