Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce errors in workers #3962

Merged
merged 4 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -459,14 +459,15 @@ def update_external_document_permissions_task(
)
doc_id = document_external_access.doc_id
external_access = document_external_access.external_access

try:
with get_session_with_tenant(tenant_id) as db_session:
# Add the users to the DB if they don't exist
batch_add_ext_perm_user_if_not_exists(
db_session=db_session,
emails=list(external_access.external_user_emails),
continue_on_error=True,
)
# Then we upsert the document's external permissions in postgres
# Then upsert the document's external permissions
created_new_doc = upsert_document_external_perms(
db_session=db_session,
doc_id=doc_id,
Expand All @@ -490,11 +491,11 @@ def update_external_document_permissions_task(
f"action=update_permissions "
f"elapsed={elapsed:.2f}"
)

except Exception:
task_logger.exception(
f"Exception in update_external_document_permissions_task: "
f"connector_id={connector_id} "
f"doc_id={doc_id}"
f"connector_id={connector_id} doc_id={doc_id}"
)
return False

Expand Down
7 changes: 6 additions & 1 deletion backend/onyx/background/celery/tasks/monitoring/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
- Throughput (docs/min) (only if success)
- Raw start/end times for each sync
"""

one_hour_ago = get_db_current_time(db_session) - timedelta(hours=1)

# Get all sync records that ended in the last hour
Expand Down Expand Up @@ -587,6 +588,10 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
entity = db_session.scalar(
select(UserGroup).where(UserGroup.id == sync_record.entity_id)
)
else:
# Only user groups and document set sync records have
# an associated entity we can use for latency metrics
continue

if entity is None:
task_logger.error(
Expand Down Expand Up @@ -777,7 +782,7 @@ def cloud_check_alembic() -> bool | None:

tenant_to_revision[tenant_id] = result_scalar
except Exception:
task_logger.warning(f"Tenant {tenant_id} has no revision!")
task_logger.error(f"Tenant {tenant_id} has no revision!")
tenant_to_revision[tenant_id] = ALEMBIC_NULL_REVISION

# get the total count of each revision
Expand Down
21 changes: 2 additions & 19 deletions backend/onyx/connectors/slack/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,6 @@ def get_message_link(
return permalink


def _make_slack_api_call_logged(
call: Callable[..., SlackResponse],
) -> Callable[..., SlackResponse]:
@wraps(call)
def logged_call(**kwargs: Any) -> SlackResponse:
logger.debug(f"Making call to Slack API '{call.__name__}' with args '{kwargs}'")
result = call(**kwargs)
logger.debug(f"Call to Slack API '{call.__name__}' returned '{result}'")
return result

return logged_call


def _make_slack_api_call_paginated(
call: Callable[..., SlackResponse],
) -> Callable[..., Generator[dict[str, Any], None, None]]:
Expand Down Expand Up @@ -127,18 +114,14 @@ def rate_limited_call(**kwargs: Any) -> SlackResponse:
def make_slack_api_call_w_retries(
call: Callable[..., SlackResponse], **kwargs: Any
) -> SlackResponse:
return basic_retry_wrapper(
make_slack_api_rate_limited(_make_slack_api_call_logged(call))
)(**kwargs)
return basic_retry_wrapper(make_slack_api_rate_limited(call))(**kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to completely remove the logging? or should we truncate or shorten it? I can see us wanting to have debug logging here occasionally for troubleshooting purposes, but it definitely is not helpful to be at the current level of spam by default.



def make_paginated_slack_api_call_w_retries(
call: Callable[..., SlackResponse], **kwargs: Any
) -> Generator[dict[str, Any], None, None]:
return _make_slack_api_call_paginated(
basic_retry_wrapper(
make_slack_api_rate_limited(_make_slack_api_call_logged(call))
)
basic_retry_wrapper(make_slack_api_rate_limited(call))
)(**kwargs)


Expand Down
24 changes: 19 additions & 5 deletions backend/onyx/db/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi_users.password import PasswordHelper
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from sqlalchemy.sql import expression
from sqlalchemy.sql.elements import ColumnElement
Expand Down Expand Up @@ -274,7 +275,7 @@ def _generate_ext_permissioned_user(email: str) -> User:


def batch_add_ext_perm_user_if_not_exists(
db_session: Session, emails: list[str]
db_session: Session, emails: list[str], continue_on_error: bool = False
) -> list[User]:
lower_emails = [email.lower() for email in emails]
found_users, missing_lower_emails = _get_users_by_emails(db_session, lower_emails)
Expand All @@ -283,10 +284,23 @@ def batch_add_ext_perm_user_if_not_exists(
for email in missing_lower_emails:
new_users.append(_generate_ext_permissioned_user(email=email))

db_session.add_all(new_users)
db_session.commit()

return found_users + new_users
try:
db_session.add_all(new_users)
db_session.commit()
except IntegrityError:
db_session.rollback()
if not continue_on_error:
raise
for user in new_users:
try:
db_session.add(user)
db_session.commit()
except IntegrityError:
db_session.rollback()
continue
# Fetch all users again to ensure we have the most up-to-date list
all_users, _ = _get_users_by_emails(db_session, lower_emails)
return all_users


def delete_user_from_db(
Expand Down
21 changes: 16 additions & 5 deletions backend/onyx/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import httpx # type: ignore
import requests # type: ignore
from retry import retry

from onyx.configs.chat_configs import DOC_TIME_DECAY
from onyx.configs.chat_configs import NUM_RETURNED_HITS
Expand Down Expand Up @@ -549,6 +550,11 @@ def update(
time.monotonic() - update_start,
)

@retry(
tries=3,
delay=1,
backoff=2,
)
def _update_single_chunk(
self,
doc_chunk_id: UUID,
Expand All @@ -559,6 +565,7 @@ def _update_single_chunk(
) -> None:
"""
Update a single "chunk" (document) in Vespa using its chunk ID.
Retries if we encounter transient HTTPStatusError (e.g., overload).
"""

update_dict: dict[str, dict] = {"fields": {}}
Expand All @@ -567,13 +574,11 @@ def _update_single_chunk(
update_dict["fields"][BOOST] = {"assign": fields.boost}

if fields.document_sets is not None:
# WeightedSet<string> needs a map { item: weight, ... }
update_dict["fields"][DOCUMENT_SETS] = {
"assign": {document_set: 1 for document_set in fields.document_sets}
}

if fields.access is not None:
# Similar to above
update_dict["fields"][ACCESS_CONTROL_LIST] = {
"assign": {acl_entry: 1 for acl_entry in fields.access.to_acl()}
}
Expand All @@ -585,7 +590,10 @@ def _update_single_chunk(
logger.error("Update request received but nothing to update.")
return

vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}?create=true"
vespa_url = (
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}"
"?create=true"
)

try:
resp = http_client.put(
Expand All @@ -595,8 +603,11 @@ def _update_single_chunk(
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}"
logger.error(error_message)
logger.error(
f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). "
f"Details: {e.response.text}"
)
# Re-raise so the @retry decorator will catch and retry
raise

def update_single(
Expand Down
Loading