diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 2b5ec69042..0a47ce2560 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -18,6 +18,7 @@ from augur.tasks.start_tasks import augur_collection_monitor, create_collection_status_records from augur.tasks.git.facade_tasks import clone_repos +from augur.tasks.github.contributors import process_contributors from augur.tasks.github.util.github_api_key_handler import GithubApiKeyHandler from augur.tasks.gitlab.gitlab_api_key_handler import GitlabApiKeyHandler from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model @@ -154,6 +155,8 @@ def start(ctx, disable_collection, development, pidfile, port): # start cloning repos when augur starts clone_repos.si().apply_async() + process_contributors.si().apply_async() + augur_collection_monitor.si().apply_async() else: diff --git a/augur/application/db/lib.py b/augur/application/db/lib.py index 4d10b90117..d6c550f30b 100644 --- a/augur/application/db/lib.py +++ b/augur/application/db/lib.py @@ -274,6 +274,17 @@ def facade_bulk_insert_commits(logger, records): raise e +def batch_insert_contributors(logger, data: Union[List[dict], dict]) -> Optional[List[dict]]: + + batch_size = 1000 + + for i in range(0, len(data), batch_size): + batch = data[i:i + batch_size] + + bulk_insert_dicts(logger, batch, Contributor, ['cntrb_id']) + + + def bulk_insert_dicts(logger, data: Union[List[dict], dict], table, natural_keys: List[str], return_columns: Optional[List[str]] = None, string_fields: Optional[List[str]] = None, on_conflict_update:bool = True) -> Optional[List[dict]]: if isinstance(data, list) is False: @@ -383,7 +394,7 @@ def bulk_insert_dicts(logger, data: Union[List[dict], dict], table, natural_keys else: logger.error("Unable to insert data in 10 attempts") - return None + raise Exception("Unable to insert and return data in 10 attempts") if deadlock_detected is True: logger.error("Made it through even though Deadlock was detected") @@ -421,7 +432,7 @@ def bulk_insert_dicts(logger, data: Union[List[dict], dict], table, natural_keys else: logger.error("Unable to insert and return data in 10 attempts") - return None + raise Exception("Unable to insert and return data in 10 attempts") if deadlock_detected is True: logger.error("Made it through even though Deadlock was detected") diff --git a/augur/tasks/github/contributors.py b/augur/tasks/github/contributors.py index f4fa7165d9..20f796647e 100644 --- a/augur/tasks/github/contributors.py +++ b/augur/tasks/github/contributors.py @@ -8,7 +8,7 @@ from augur.tasks.github.facade_github.tasks import * from augur.application.db.models import Contributor from augur.application.db.util import execute_session_query -from augur.application.db.lib import bulk_insert_dicts, get_session +from augur.application.db.lib import bulk_insert_dicts, get_session, batch_insert_contributors from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth @@ -63,7 +63,7 @@ def process_contributors(): enriched_contributors.append(contributor_dict) logger.info(f"Enriching {len(enriched_contributors)} contributors") - bulk_insert_dicts(enriched_contributors, Contributor, ["cntrb_id"]) + batch_insert_contributors(logger, enriched_contributors) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index cf7df57582..7134fc77a1 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -14,7 +14,7 @@ from augur.tasks.github.util.util import get_owner_repo from augur.tasks.util.worker_util import remove_duplicate_dicts from augur.application.db.models import PullRequestEvent, IssueEvent, Contributor, Repo -from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_issues_by_repo_id, get_pull_requests_by_repo_id, update_issue_closed_cntrbs_by_repo_id, get_session, get_engine, get_core_data_last_collected +from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_issues_by_repo_id, get_pull_requests_by_repo_id, update_issue_closed_cntrbs_by_repo_id, get_session, get_engine, get_core_data_last_collected, batch_insert_contributors platform_id = 1 @@ -82,7 +82,7 @@ def _insert_pr_events(self, events): bulk_insert_dicts(self._logger, events, PullRequestEvent, pr_event_natural_keys) def _insert_contributors(self, contributors): - bulk_insert_dicts(self._logger, contributors, Contributor, ["cntrb_id"]) + batch_insert_contributors(self._logger, contributors) def _process_github_event_contributors(self, event): diff --git a/augur/tasks/github/facade_github/core.py b/augur/tasks/github/facade_github/core.py index 32ea13b041..55b2281ad8 100644 --- a/augur/tasks/github/facade_github/core.py +++ b/augur/tasks/github/facade_github/core.py @@ -3,7 +3,7 @@ from augur.tasks.github.util.github_task_session import * from augur.application.db.models import * from augur.tasks.util.AugurUUID import GithubUUID -from augur.application.db.lib import bulk_insert_dicts +from augur.application.db.lib import bulk_insert_dicts, batch_insert_contributors from augur.tasks.github.util.github_data_access import GithubDataAccess @@ -116,10 +116,9 @@ def query_github_contributors(logger, key_auth, github_url): #"data_source": session.data_source } - cntrb_natural_keys = ['cntrb_id'] #insert cntrb to table. #session.logger.info(f"Contributor: {cntrb} \n") - bulk_insert_dicts(logger, cntrb,Contributor,cntrb_natural_keys) + batch_insert_contributors(logger, [cntrb]) except Exception as e: logger.error("Caught exception: {}".format(e)) diff --git a/augur/tasks/github/facade_github/tasks.py b/augur/tasks/github/facade_github/tasks.py index 8b3a304a71..1b11f98223 100644 --- a/augur/tasks/github/facade_github/tasks.py +++ b/augur/tasks/github/facade_github/tasks.py @@ -7,7 +7,7 @@ from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth from augur.application.db.models import Contributor from augur.tasks.github.facade_github.core import * -from augur.application.db.lib import execute_sql, get_contributor_aliases_by_email, get_unresolved_commit_emails_by_name, get_contributors_by_full_name, get_repo_by_repo_git +from augur.application.db.lib import execute_sql, get_contributor_aliases_by_email, get_unresolved_commit_emails_by_name, get_contributors_by_full_name, get_repo_by_repo_git, batch_insert_contributors from augur.tasks.git.util.facade_worker.facade_worker.facade00mainprogram import * @@ -127,8 +127,7 @@ def process_commit_metadata(logger, auth, contributorQueue, repo_id, platform_id #Executes an upsert with sqlalchemy cntrb_natural_keys = ['cntrb_id'] - - bulk_insert_dicts(logger, cntrb,Contributor,cntrb_natural_keys) + batch_insert_contributors(logger, [cntrb]) try: # Update alias after insertion. Insertion needs to happen first so we can get the autoincrementkey diff --git a/augur/tasks/github/issues.py b/augur/tasks/github/issues.py index ae1fb07cd9..37bee5c8dd 100644 --- a/augur/tasks/github/issues.py +++ b/augur/tasks/github/issues.py @@ -14,7 +14,7 @@ from augur.tasks.util.worker_util import remove_duplicate_dicts from augur.application.db.models import Issue, IssueLabel, IssueAssignee, Contributor from augur.application.config import get_development_flag -from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_core_data_last_collected +from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_core_data_last_collected, batch_insert_contributors development = get_development_flag() @@ -130,7 +130,7 @@ def process_issues(issues, task_name, repo_id, logger) -> None: # insert contributors from these issues logger.info(f"{task_name}: Inserting {len(contributors)} contributors") - bulk_insert_dicts(logger, contributors, Contributor, ["cntrb_id"]) + batch_insert_contributors(logger, contributors) # insert the issues into the issues table. diff --git a/augur/tasks/github/pull_requests/core.py b/augur/tasks/github/pull_requests/core.py index 38d1136eb6..dd63edab6e 100644 --- a/augur/tasks/github/pull_requests/core.py +++ b/augur/tasks/github/pull_requests/core.py @@ -4,7 +4,7 @@ from augur.application.db.data_parse import * from augur.application.db.session import DatabaseSession -from augur.application.db.lib import bulk_insert_dicts +from augur.application.db.lib import bulk_insert_dicts, batch_insert_contributors from augur.tasks.github.util.util import add_key_value_pair_to_dicts from augur.tasks.util.worker_util import remove_duplicate_dicts from augur.application.db.models import PullRequest, PullRequestLabel, PullRequestReviewer, PullRequestMeta, PullRequestAssignee, Contributor @@ -144,7 +144,7 @@ def insert_pr_contributors(contributors: List[dict], logger, task_name: str) -> # insert contributors from these prs logger.info(f"{task_name}: Inserting {len(contributors)} contributors") - bulk_insert_dicts(logger, contributors, Contributor, ["cntrb_id"]) + batch_insert_contributors(logger, contributors) def insert_prs(pr_dicts: List[dict], logger, task_name: str) -> Optional[List[dict]]: diff --git a/augur/tasks/github/pull_requests/tasks.py b/augur/tasks/github/pull_requests/tasks.py index b65da7f4f3..c806cd1416 100644 --- a/augur/tasks/github/pull_requests/tasks.py +++ b/augur/tasks/github/pull_requests/tasks.py @@ -11,7 +11,7 @@ from augur.application.db.models import PullRequest, Message, PullRequestReview, PullRequestLabel, PullRequestReviewer, PullRequestMeta, PullRequestAssignee, PullRequestReviewMessageRef, Contributor, Repo from augur.tasks.github.util.github_task_session import GithubTaskManifest from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth -from augur.application.db.lib import get_session, get_repo_by_repo_git, bulk_insert_dicts, get_pull_request_reviews_by_repo_id +from augur.application.db.lib import get_session, get_repo_by_repo_git, bulk_insert_dicts, get_pull_request_reviews_by_repo_id, batch_insert_contributors from augur.application.db.util import execute_session_query from ..messages import process_github_comment_contributors from augur.application.db.lib import get_secondary_data_last_collected, get_updated_prs, get_core_data_last_collected @@ -260,7 +260,7 @@ def collect_pull_request_review_comments(repo_git: str, full_collection: bool) - contributors.append(contributor) logger.info(f"{owner}/{repo} Pr review messages: Inserting {len(contributors)} contributors") - bulk_insert_dicts(logger, contributors, Contributor, ["cntrb_id"]) + batch_insert_contributors(logger, contributors) pr_review_comment_dicts = [] diff --git a/augur/tasks/gitlab/issues_task.py b/augur/tasks/gitlab/issues_task.py index dc5b8ffc24..8a1415a7de 100644 --- a/augur/tasks/gitlab/issues_task.py +++ b/augur/tasks/gitlab/issues_task.py @@ -11,7 +11,7 @@ from augur.tasks.github.util.util import get_gitlab_repo_identifier, add_key_value_pair_to_dicts from augur.application.db.models import Issue, IssueLabel, IssueAssignee, IssueMessageRef, Message, Contributor, Repo from augur.tasks.util.worker_util import remove_duplicate_dicts -from augur.application.db.lib import bulk_insert_dicts, get_repo_by_repo_git, get_session +from augur.application.db.lib import bulk_insert_dicts, get_repo_by_repo_git, get_session, batch_insert_contributors from augur.tasks.gitlab.gitlab_random_key_auth import GitlabRandomKeyAuth platform_id = 2 @@ -140,7 +140,7 @@ def process_issues(issues, task_name, repo_id, logger) -> None: # insert contributors from these issues logger.info(f"{task_name}: Inserting {len(contributors)} contributors") - bulk_insert_dicts(logger, contributors, Contributor, ["cntrb_id"]) + batch_insert_contributors(logger, contributors) logger.info(f"{task_name}: Inserting {len(issue_dicts)} gitlab issues") issue_natural_keys = ["repo_id", "gh_issue_id"] @@ -325,7 +325,7 @@ def process_gitlab_issue_messages(data, task_name, repo_id, logger, session): contributors = remove_duplicate_dicts(contributors) logger.info(f"{task_name}: Inserting {len(contributors)} contributors") - bulk_insert_dicts(logger, contributors, Contributor, ["cntrb_id"]) + batch_insert_contributors(logger, contributors) logger.info(f"{task_name}: Inserting {len(message_dicts)} messages") message_natural_keys = ["platform_msg_id", "pltfrm_id"] diff --git a/augur/tasks/gitlab/merge_request_task.py b/augur/tasks/gitlab/merge_request_task.py index fd4674c3f5..7a3b006184 100644 --- a/augur/tasks/gitlab/merge_request_task.py +++ b/augur/tasks/gitlab/merge_request_task.py @@ -8,7 +8,7 @@ from augur.application.db.models import PullRequest, PullRequestLabel, PullRequestMeta, PullRequestCommit, PullRequestFile, PullRequestMessageRef, Repo, Message, Contributor, PullRequestAssignee from augur.tasks.gitlab.gitlab_random_key_auth import GitlabRandomKeyAuth from augur.tasks.util.worker_util import remove_duplicate_dicts -from augur.application.db.lib import bulk_insert_dicts, get_repo_by_repo_git, get_session +from augur.application.db.lib import bulk_insert_dicts, get_repo_by_repo_git, get_session, batch_insert_contributors platform_id = 2 @@ -125,7 +125,7 @@ def process_merge_requests(data, task_name, repo_id, logger): contributors = remove_duplicate_dicts(contributors) logger.info(f"{task_name}: Inserting {len(contributors)} contributors") - bulk_insert_dicts(logger, contributors, Contributor, ["cntrb_id"]) + batch_insert_contributors(logger, contributors) logger.info(f"{task_name}: Inserting mrs of length: {len(merge_requests)}") pr_natural_keys = ["repo_id", "pr_src_id"] @@ -250,7 +250,7 @@ def process_gitlab_mr_messages(data, task_name, repo_id, logger, session): contributors = remove_duplicate_dicts(contributors) logger.info(f"{task_name}: Inserting {len(contributors)} mr message contributors") - bulk_insert_dicts(logger, contributors, Contributor, ["cntrb_id"]) + batch_insert_contributors(logger, contributors) logger.info(f"{task_name}: Inserting {len(message_dicts)} mr messages") message_natural_keys = ["platform_msg_id", "pltfrm_id"] diff --git a/augur/tasks/init/celery_app.py b/augur/tasks/init/celery_app.py index 1be45b1f00..c7ba1659a7 100644 --- a/augur/tasks/init/celery_app.py +++ b/augur/tasks/init/celery_app.py @@ -204,6 +204,7 @@ def setup_periodic_tasks(sender, **kwargs): from augur.tasks.start_tasks import augur_collection_monitor, augur_collection_update_weights from augur.tasks.start_tasks import non_repo_domain_tasks, retry_errored_repos from augur.tasks.git.facade_tasks import clone_repos + from augur.tasks.github.contributors import process_contributors from augur.tasks.db.refresh_materialized_views import refresh_materialized_views from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model from augur.application.db import temporary_database_engine @@ -232,6 +233,11 @@ def setup_periodic_tasks(sender, **kwargs): logger.info(f"Setting 404 repos to be marked for retry on midnight each day") sender.add_periodic_task(crontab(hour=0, minute=0),retry_errored_repos.s()) + one_hour_in_seconds = 60*60 + sender.add_periodic_task(one_hour_in_seconds, process_contributors.s() +) + + @after_setup_logger.connect def setup_loggers(*args,**kwargs): """Override Celery loggers with our own.""" diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 3ba30ed707..11f4656519 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -81,7 +81,7 @@ def primary_repo_collect_phase(repo_git, full_collection): #Other tasks that don't need other tasks to run before they do just put in final group. repo_task_group = group( collect_repo_info.si(repo_git), - chain(primary_repo_jobs | issue_pr_task_update_weight_util.s(repo_git=repo_git),secondary_repo_jobs,process_contributors.si()), + chain(primary_repo_jobs | issue_pr_task_update_weight_util.s(repo_git=repo_git),secondary_repo_jobs), #facade_phase(logger,repo_git), collect_linux_badge_info.si(repo_git), collect_releases.si(repo_git),