Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional error handling + logging for google drive connector #3563

Merged
merged 2 commits into from
Dec 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions backend/onyx/connectors/google_drive/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore
from googleapiclient.errors import HttpError # type: ignore

from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.app_configs import MAX_FILE_SIZE_BYTES
Expand All @@ -20,6 +21,7 @@
from onyx.connectors.google_drive.file_retrieval import get_all_files_for_oauth
from onyx.connectors.google_drive.file_retrieval import get_all_files_in_my_drive
from onyx.connectors.google_drive.file_retrieval import get_files_in_shared_drive
from onyx.connectors.google_drive.file_retrieval import get_root_folder_id
from onyx.connectors.google_drive.models import GoogleDriveFileType
from onyx.connectors.google_utils.google_auth import get_google_creds
from onyx.connectors.google_utils.google_utils import execute_paginated_retrieval
Expand All @@ -41,6 +43,7 @@
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import retry_builder

logger = setup_logger()
# TODO: Improve this by using the batch utility: https://googleapis.github.io/google-api-python-client/docs/batch.html
Expand Down Expand Up @@ -286,13 +289,30 @@ def _impersonate_user_for_retrieval(
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> Iterator[GoogleDriveFileType]:
logger.info(f"Impersonating user {user_email}")

drive_service = get_drive_service(self.creds, user_email)

# validate that the user has access to the drive APIs by performing a simple
# request and checking for a 401
try:
retry_builder()(get_root_folder_id)(drive_service)
except HttpError as e:
if e.status_code == 401:
# fail gracefully, let the other impersonations continue
# one user without access shouldn't block the entire connector
logger.exception(
f"User '{user_email}' does not have access to the drive APIs."
)
return
raise

# if we are including my drives, try to get the current user's my
# drive if any of the following are true:
# - include_my_drives is true
# - the current user's email is in the requested emails
if self.include_my_drives or user_email in self._requested_my_drive_emails:
logger.info(f"Getting all files in my drive as '{user_email}'")
yield from get_all_files_in_my_drive(
service=drive_service,
update_traversed_ids_func=self._update_traversed_parent_ids,
Expand All @@ -303,6 +323,7 @@ def _impersonate_user_for_retrieval(

remaining_drive_ids = filtered_drive_ids - self._retrieved_ids
for drive_id in remaining_drive_ids:
logger.info(f"Getting files in shared drive '{drive_id}' as '{user_email}'")
yield from get_files_in_shared_drive(
service=drive_service,
drive_id=drive_id,
Expand All @@ -314,6 +335,7 @@ def _impersonate_user_for_retrieval(

remaining_folders = filtered_folder_ids - self._retrieved_ids
for folder_id in remaining_folders:
logger.info(f"Getting files in folder '{folder_id}' as '{user_email}'")
yield from crawl_folders_for_files(
service=drive_service,
parent_id=folder_id,
Expand Down Expand Up @@ -344,6 +366,15 @@ def _manage_service_account_retrieval(
elif self.include_shared_drives:
drive_ids_to_retrieve = all_drive_ids

# checkpoint - we've found all users and drives, now time to actually start
# fetching stuff
logger.info(f"Found {len(all_org_emails)} users to impersonate")
logger.debug(f"Users: {all_org_emails}")
logger.info(f"Found {len(drive_ids_to_retrieve)} drives to retrieve")
logger.debug(f"Drives: {drive_ids_to_retrieve}")
logger.info(f"Found {len(folder_ids_to_retrieve)} folders to retrieve")
logger.debug(f"Folders: {folder_ids_to_retrieve}")

# Process users in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_email = {
Expand Down Expand Up @@ -380,6 +411,13 @@ def _manage_oauth_retrieval(
drive_service = get_drive_service(self.creds, self.primary_admin_email)

if self.include_files_shared_with_me or self.include_my_drives:
logger.info(
f"Getting shared files/my drive files for OAuth "
f"with include_files_shared_with_me={self.include_files_shared_with_me}, "
f"include_my_drives={self.include_my_drives}, "
f"include_shared_drives={self.include_shared_drives}."
f"Using '{self.primary_admin_email}' as the account."
)
yield from get_all_files_for_oauth(
service=drive_service,
include_files_shared_with_me=self.include_files_shared_with_me,
Expand Down Expand Up @@ -412,6 +450,9 @@ def _manage_oauth_retrieval(
drive_ids_to_retrieve = all_drive_ids

for drive_id in drive_ids_to_retrieve:
logger.info(
f"Getting files in shared drive '{drive_id}' as '{self.primary_admin_email}'"
)
yield from get_files_in_shared_drive(
service=drive_service,
drive_id=drive_id,
Expand All @@ -425,6 +466,9 @@ def _manage_oauth_retrieval(
# that could be folders.
remaining_folders = folder_ids_to_retrieve - self._retrieved_ids
for folder_id in remaining_folders:
logger.info(
f"Getting files in folder '{folder_id}' as '{self.primary_admin_email}'"
)
yield from crawl_folders_for_files(
service=drive_service,
parent_id=folder_id,
Expand Down
Loading