diff --git a/services/lock_manager.py b/services/lock_manager.py index 0cb691cf8..618443edd 100644 --- a/services/lock_manager.py +++ b/services/lock_manager.py @@ -17,6 +17,7 @@ class LockType(Enum): BUNDLE_ANALYSIS_PROCESSING = "bundle_analysis_processing" BUNDLE_ANALYSIS_NOTIFY = "bundle_analysis_notify" NOTIFICATION = "notify" + NOTIFICATION_ORCHESTRATOR = "notification_orchestrator" # TODO: port existing task locking to use `LockManager` diff --git a/tasks/__init__.py b/tasks/__init__.py index 7c9c39823..a3f39ca36 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -31,6 +31,7 @@ from tasks.label_analysis import label_analysis_task from tasks.manual_trigger import manual_trigger_task from tasks.new_user_activated import new_user_activated_task +from tasks.notification_orchestrator import notification_orchestrator_task from tasks.notify import notify_task from tasks.notify_error import notify_error_task from tasks.plan_manager_task import daily_plan_manager_task_name diff --git a/tasks/notification_orchestrator.py b/tasks/notification_orchestrator.py new file mode 100644 index 000000000..b6eb00c64 --- /dev/null +++ b/tasks/notification_orchestrator.py @@ -0,0 +1,624 @@ +import logging +import re +from enum import Enum +from functools import partial +from typing import Optional, TypedDict + +from celery.exceptions import MaxRetriesExceededError +from shared.celery_config import ( + compute_comparison_task_name, + notification_orchestrator_task_name, + notify_task_name, + pulls_task_name, +) +from shared.torngit.exceptions import TorngitClientError, TorngitServerFailureError +from shared.yaml import UserYaml +from sqlalchemy.orm.session import Session + +from app import celery_app +from celery_config import notify_error_task_name +from database.enums import CommitErrorTypes, ReportType +from database.models import Commit, Pull +from helpers.checkpoint_logger.flows import UploadFlow +from helpers.clock import get_seconds_to_next_hour +from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError +from helpers.github_installation import get_installation_name_for_owner_for_task +from helpers.save_commit_error import save_commit_error +from services.comparison import ( + get_or_create_comparison, +) +from services.lock_manager import LockManager, LockRetry, LockType +from services.processing.state import ProcessingState, should_trigger_postprocessing +from services.processing.types import ProcessingResult +from services.redis import Redis, get_redis_connection +from services.report import ReportService +from services.repository import ( + get_repo_provider_service, +) +from services.yaml import read_yaml_field +from tasks.base import BaseCodecovTask +from tasks.upload_processor import UPLOAD_PROCESSING_LOCK_NAME + +log = logging.getLogger(__name__) +regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") + + +class ShouldCallNotifyResult(Enum): + NOTIFY = "notify" + NOTIFY_ERROR = "notify_error" + DO_NOT_NOTIFY = "do_not_notify" + WAIT_TO_NOTIFY = "wait_to_notify" + + +class ShouldCallNotifyResponse(TypedDict): + notification_result: ShouldCallNotifyResult + reason: str + # This is currently not used, but my idea here was to provide it and log this message in switch statements. + # Logging could become a bit less flexible but there's less logging altogether. Thoughts? + message: str + + +# TODO: Ask: does this need a throws = (SoftTimeLimitExceeded,)? +class NotificationOrchestratorTask( + BaseCodecovTask, name=notification_orchestrator_task_name +): + def run_impl( + self, + db_session: Session, + *args, + repoid: int, + commitid: str, + commit_yaml: dict, + processing_results: list[ProcessingResult], + report_code: str | None = None, + empty_upload=None, + **kwargs, + ): + lock_manager = LockManager( + repoid=repoid, + commitid=commitid, + # Currently hardcoded to coverage, should be variable to the report type + report_type=ReportType.COVERAGE, + # TODO: Ask if we need a specific timeout hard timeout for this task, unsure + lock_timeout=max(80, self.hard_time_limit_task), + ) + try: + lock_acquired = False + with lock_manager.locked( + lock_type=LockType.NOTIFICATION_ORCHESTRATOR, + retry_num=self.request.retries, + ): + lock_acquired = True + return self.run_impl_within_lock( + db_session, + repoid=repoid, + commitid=commitid, + commit_yaml=commit_yaml, + processing_results=processing_results, + report_code=report_code, + empty_upload=empty_upload, + **kwargs, + ) + # TODO: What should happen when there's a lock error? I think this should change to some retrying mechanism + # Should it be a LockRetry or a LockError? + except LockRetry as err: + ( + log.info( + "Not notifying because there is another notification already happening", + extra=dict( + repoid=repoid, + commitid=commitid, + error_type=type(err), + lock_acquired=lock_acquired, + ), + ), + ) + UploadFlow.log(UploadFlow.NOTIF_LOCK_ERROR) + return { + "notified": False, + "notifications": None, + "reason": "unobtainable_lock", + } + + def run_impl_within_lock( + self, + db_session: Session, + *args, + repoid: int, + commitid: str, + commit_yaml: dict, + processing_results: list[ProcessingResult], + report_code: str | None = None, + empty_upload=None, + **kwargs, + ): + repoid = int(repoid) + commit = ( + db_session.query(Commit) + .filter(Commit.repoid == repoid, Commit.commitid == commitid) + .first() + ) + assert commit, "Commit not found in database." + commit_yaml = UserYaml(commit_yaml) + + log_extra_dict = { + "repoid": commit.repoid, + "commit": commit.commitid, + "commit_yaml": commit_yaml.to_dict(), + "processing_results": processing_results, + "report_code": report_code, + "parent_task": self.request.parent_id, + } + + # Main logic that controls the notification states and decides what to do based on them + should_call_notification = self.should_call_notifications( + commit=commit, + commit_yaml=commit_yaml, + processing_results=processing_results, + report_code=report_code, + log_extra_dict=log_extra_dict, + ) + match should_call_notification: + case {"notification_result": ShouldCallNotifyResult.NOTIFY}: + notify_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + "empty_upload": empty_upload, + } + notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) + task = self.app.tasks[notify_task_name].apply_async( + kwargs=notify_kwargs + ) + log.info( + "Scheduling notify task", + extra={ + **log_extra_dict, + "notify_task_id": task.id, + }, + ) + self.orchestrator_side_effects(db_session=db_session, commit=commit) + # TODO: We should add a UploadFlow.ATTEMPTING_NOTIFICATION + case { + "notification_result": ShouldCallNotifyResult.DO_NOT_NOTIFY, + "reason": reason, + }: + log.info( + "Skipping notify task", extra={**log_extra_dict, "reason": reason} + ) + UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) + case { + "notification_result": ShouldCallNotifyResult.WAIT_TO_NOTIFY, + "reason": reason, + "extra": {"max_retries": max_retries, "countdown": countdown}, + }: + log.info( + "Unable to start notifications. Retrying again later.", + extra={ + **log_extra_dict, + "reason": reason, + "countdown": countdown, + "max_retries": max_retries, + }, + ) + return self._attempt_retry( + max_retries=max_retries, + countdown=countdown, + log_extra_dict=log_extra_dict, + ) + case {"notification_result": ShouldCallNotifyResult.NOTIFY_ERROR}: + log.info("Attempting to notify error", extra=log_extra_dict) + notify_error_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + } + notify_error_kwargs = UploadFlow.save_to_kwargs(notify_error_kwargs) + task = self.app.tasks[notify_error_task_name].apply_async( + kwargs=notify_error_kwargs + ) + + return should_call_notification + + def should_call_notifications( + self, + commit: Commit, + commit_yaml: UserYaml, + processing_results: list[ProcessingResult], + log_extra_dict: dict, + report_code: str | None = None, + ) -> ShouldCallNotifyResponse: + # Defines all the logic to consider a notification + should_notify_check_functions = [ + partial( + self.upload_processing_checks, processing_results=processing_results + ), + partial( + self.yaml_checks, + processing_results=processing_results, + commit_yaml=commit_yaml, + ), + partial(self.business_logic_checks, report_code=report_code), + ] + + # Loop through all the notification checks. Each function should return None unless + # it shouldn't notify, error or it needs to retry + for func in should_notify_check_functions: + result = func(commit=commit, log_extra_dict=log_extra_dict) + if result: + return result + + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.NOTIFY, + reason="successful_notification_scheduling", + message="Scheduling notify task", + ) + + def upload_processing_checks( + self, + commit: Commit, + processing_results: list[ProcessingResult], + log_extra_dict: dict, + ) -> Optional[ShouldCallNotifyResponse]: + # Checks if there is at least a success + processing_successes = [x["successful"] for x in processing_results] + if not any(processing_successes): + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="no_successful_processing", + message="No successful processing", + ) + + # Determine if there should be anything post_processing + repoid = commit.repoid + commitid = commit.commitid + state = ProcessingState(repoid, commitid) + if not should_trigger_postprocessing(state.get_upload_numbers()): + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="no_postprocessing_needed", + message="No need to trigger postprocessing", + ) + + # Determines if there are other uploads processing or incoming + redis_connection = get_redis_connection() + if self.has_upcoming_notifies_according_to_redis( + redis_connection, repoid, commitid + ): + log.info( + "Not notifying because there are seemingly other jobs being processed yet", + extra=log_extra_dict, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_other_notifications_coming", + message="Not notifying because there are seemingly other jobs being processed yet", + ) + + return None + + def yaml_checks( + self, + commit: Commit, + processing_results: list[ProcessingResult], + commit_yaml: UserYaml, + log_extra_dict: dict, + ) -> Optional[ShouldCallNotifyResponse]: + # Checks for manual trigger + manual_trigger = read_yaml_field( + commit_yaml, ("codecov", "notify", "manual_trigger") + ) + if manual_trigger: + log.info( + "Not scheduling notify because manual trigger is used", + extra=log_extra_dict, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_manual_trigger_yaml_setting", + message="Not scheduling notify because manual trigger is used", + ) + + # Checks for after_n_builds + after_n_builds = ( + read_yaml_field(commit_yaml, ("codecov", "notify", "after_n_builds")) or 0 + ) + if after_n_builds > 0: + report = ReportService(commit_yaml).get_existing_report_for_commit(commit) + number_sessions = len(report.sessions) if report is not None else 0 + if after_n_builds > number_sessions: + log.info( + f"Not scheduling notify because `after_n_builds` is {after_n_builds} and we only found {number_sessions} builds", + extra=log_extra_dict, + ) + return { + "notification_result": ShouldCallNotifyResult.DO_NOT_NOTIFY, + "reason": "has_after_n_builds_yaml_setting", + "message": f"Not scheduling notify because `after_n_builds` is {after_n_builds} and we only found {number_sessions} builds", + } + + # Checks for notify_error + notify_error = read_yaml_field( + commit_yaml, + ("codecov", "notify", "notify_error"), + _else=False, + ) + processing_successes = [x["successful"] for x in processing_results] + if notify_error and ( + len(processing_successes) == 0 or not all(processing_successes) + ): + log.info( + "Not scheduling notify because there is a non-successful processing result", + extra=log_extra_dict, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.NOTIFY_ERROR, + reason="has_notify_error_yaml_setting", + message="Not scheduling notify because there is a non-successful processing result", + ) + + # Creates repository_service and ci_results to use with yaml settings + try: + installation_name_to_use = get_installation_name_for_owner_for_task( + self.name, commit.repository.owner + ) + # This is technically a different repository_service method than the one used in the notify task. I + # replaced it because the other method is a) deeply intertwined with the notify file and b) it seems + # to be extra specific just for the notification aspect of it, so this should suffice for other checks. + # Please look corroborate this isn't a red flag. + repository_service = get_repo_provider_service(commit.repository) + except RepositoryWithoutValidBotError: + save_commit_error( + commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value + ) + log.warning( + "Unable to start notifications because repo doesn't have a valid bot", + extra=log_extra_dict, + ) + UploadFlow.log(UploadFlow.NOTIF_NO_VALID_INTEGRATION) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_no_valid_bot", + message="Unable to start notifications because repo doesn't have a valid bot", + ) + except NoConfiguredAppsAvailable as exp: + if exp.rate_limited_count > 0: + # There's at least 1 app that we can use to communicate with GitHub, + # but this app happens to be rate limited now. We try again later. + # Min wait time of 1 minute + retry_delay_seconds = max(60, get_seconds_to_next_hour()) + log.warning( + "Unable to start notifications. Retrying again later.", + extra={ + **log_extra_dict, + "apps_available": exp.apps_count, + "apps_rate_limited": exp.rate_limited_count, + "apps_suspended": exp.suspended_count, + "countdown_seconds": retry_delay_seconds, + }, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.WAIT_TO_NOTIFY, + reason="retrying_because_app_is_rate_limited", + message="Unable to start notifications. Retrying again later.", + extra=dict( + max_retries=10, + countdown=retry_delay_seconds, + ), + ) + # Maybe we have apps that are suspended. We can't communicate with github. + log.warning( + "We can't find an app to communicate with GitHub. Not notifying.", + extra={ + **log_extra_dict, + "apps_available": exp.apps_count, + "apps_suspended": exp.suspended_count, + }, + ) + UploadFlow.log(UploadFlow.NOTIF_NO_APP_INSTALLATION) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="no_valid_github_app_found", + message="We can't find an app to communicate with GitHub. Not notifying.", + ) + + try: + ci_results = self.fetch_and_update_whether_ci_passed( + repository_service, commit, commit_yaml + ) + except TorngitClientError as ex: + log.info( + "Unable to fetch CI results due to a client problem. Not notifying user", + extra={ + **log_extra_dict, + "code": ex.code, + }, + ) + UploadFlow.log(UploadFlow.NOTIF_GIT_CLIENT_ERROR) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="not_able_fetch_ci_result", + message="Unable to fetch CI results due to a client problem. Not notifying user", + ) + except TorngitServerFailureError: + log.info( + "Unable to fetch CI results due to server issues. Not notifying user", + extra=log_extra_dict, + ) + UploadFlow.log(UploadFlow.NOTIF_GIT_SERVICE_ERROR) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="server_issues_ci_result", + message="Unable to fetch CI results due to a client problem. Not notifying user", + ) + + # Check for wait_for_ci based on the CI results and reattempt if true + wait_for_ci = read_yaml_field( + commit_yaml, ("codecov", "notify", "wait_for_ci"), True + ) + if wait_for_ci and ci_results is None: + log.info( + "Not sending notifications yet because we are waiting for CI to finish. Attempting retry", + extra=log_extra_dict, + ) + ghapp_default_installations = list( + filter( + lambda obj: obj.name == installation_name_to_use + and obj.is_configured(), + commit.repository.owner.github_app_installations or [], + ) + ) + rely_on_webhook_ghapp = ghapp_default_installations != [] and any( + obj.is_repo_covered_by_integration(commit.repository) + for obj in ghapp_default_installations + ) + rely_on_webhook_legacy = commit.repository.using_integration + if ( + rely_on_webhook_ghapp + or rely_on_webhook_legacy + or commit.repository.hookid + ): + # rely on the webhook, but still retry in case we miss the webhook + max_retries = 5 + countdown = (60 * 3) * 2**self.request.retries + else: + max_retries = 10 + countdown = 15 * 2**self.request.retries + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.WAIT_TO_NOTIFY, + reason="retrying_because_wait_for_ci", + message="Not sending notifications yet because we are waiting for CI to finish. Attempting retry", + extra=dict( + max_retries=max_retries, + countdown=countdown, + ), + ) + + # Check for require_ci_to_pass if ci_results is false + require_ci_to_pass = read_yaml_field( + commit_yaml, ("codecov", "require_ci_to_pass"), True + ) + if require_ci_to_pass and ci_results is False: + log.info( + "Not sending notifications because CI failed", extra=log_extra_dict + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.NOTIFY_ERROR, + reason="has_require_ci_to_pass_yaml_setting_and_no_ci_results", + message="Not sending notifications because CI failed", + ) + return None + + def business_logic_checks( + self, commit: Commit, log_extra_dict: dict, report_code: str | None = None + ): + # Notifications should be off in case of local uploads, and report code wouldn't be null in that case + if report_code is not None: + log.info( + "Not scheduling notify because it's a local upload", + extra=log_extra_dict, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_require_ci_to_pass_yaml_setting_and_no_ci_results", + message="Not scheduling notify because it's a local upload", + ) + + # Some check on CI skipping from some regex? Idk what this is + if regexp_ci_skip.search(commit.message or ""): + commit.state = "skipped" + log.info( + "Not scheduling notify because regex wants to skip ci", + extra=log_extra_dict, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_has_regexp_ci_skip", + message="Not scheduling notify because regex wants to skip ci", + ) + return None + + def orchestrator_side_effects( + self, + db_session: Session, + commit: Commit, + ): + if commit.pullid: + repoid = commit.repoid + pull = ( + db_session.query(Pull) + .filter_by(repoid=repoid, pullid=commit.pullid) + .first() + ) + if pull: + head = pull.get_head_commit() + if head is None or head.timestamp <= commit.timestamp: + pull.head = commit.commitid + if pull.head == commit.commitid: + db_session.commit() + self.app.tasks[pulls_task_name].apply_async( + kwargs=dict( + repoid=repoid, + pullid=pull.pullid, + should_send_notifications=False, + ) + ) + compared_to = pull.get_comparedto_commit() + if compared_to: + comparison = get_or_create_comparison( + db_session, compared_to, commit + ) + db_session.commit() + self.app.tasks[compute_comparison_task_name].apply_async( + kwargs=dict(comparison_id=comparison.id) + ) + + def _attempt_retry( + self, max_retries: int, countdown: int, log_extra_dict: dict + ) -> None: + try: + self.retry(max_retries=max_retries, countdown=countdown) + except MaxRetriesExceededError: + log.warning( + "Not attempting to retry notifications since we already retried too many times", + extra={ + **log_extra_dict, + "max_retries": max_retries, + "next_countdown_would_be": countdown, + }, + ) + UploadFlow.log(UploadFlow.NOTIF_TOO_MANY_RETRIES) + return { + "notification_result": ShouldCallNotifyResult.DO_NOT_NOTIFY, + "reason": "too_many_retries", + "message": "Not attempting to retry notifications since we already retried too many times", + } + + def has_upcoming_notifies_according_to_redis( + self, redis_connection: Redis, repoid: int, commitid: str + ) -> bool: + """Checks whether there are any jobs processing according to Redis right now and, + therefore, whether more up-to-date notifications will come after this anyway + + It's very important to have this code be conservative against saying + there are upcoming notifies already. The point of this code is to + avoid extra notifications for efficiency purposes, but it is better + to send extra notifications than to lack notifications + + Args: + redis_connection (Redis): The redis connection we check against + repoid (int): The repoid of the commit + commitid (str): The commitid of the commit + """ + upload_processing_lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) + if redis_connection.get(upload_processing_lock_name): + return True + return False + + +RegisteredNotificationOrchestratorTask = celery_app.register_task( + NotificationOrchestratorTask() +) +notification_orchestrator_task = celery_app.tasks[ + RegisteredNotificationOrchestratorTask.name +] diff --git a/tasks/notify.py b/tasks/notify.py index 927cdec5f..419983e46 100644 --- a/tasks/notify.py +++ b/tasks/notify.py @@ -18,22 +18,19 @@ from shared.django_apps.codecov_auth.models import Service from shared.reports.readonly import ReadOnlyReport from shared.torngit.base import TokenType, TorngitBaseAdapter -from shared.torngit.exceptions import TorngitClientError, TorngitServerFailureError from shared.typings.torngit import OwnerInfo, RepoInfo, TorngitInstanceData from shared.yaml import UserYaml from sqlalchemy import and_ from sqlalchemy.orm.session import Session from app import celery_app -from database.enums import CommitErrorTypes, Decoration, NotificationState, ReportType +from database.enums import Decoration, NotificationState, ReportType from database.models import Commit, Pull from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME, CompareCommit from helpers.checkpoint_logger.flows import UploadFlow -from helpers.clock import get_seconds_to_next_hour from helpers.comparison import minimal_totals -from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError +from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task -from helpers.save_commit_error import save_commit_error from services.activation import activate_user from services.commit_status import RepositoryCIFilter from services.comparison import ( @@ -46,7 +43,6 @@ from services.github import get_github_app_for_commit, set_github_app_for_commit from services.lock_manager import LockManager, LockRetry, LockType from services.notification import NotificationService -from services.redis import Redis, get_redis_connection from services.report import ReportService from services.repository import ( EnrichedPull, @@ -56,7 +52,6 @@ ) from services.yaml import get_current_yaml, read_yaml_field from tasks.base import BaseCodecovTask -from tasks.upload_processor import UPLOAD_PROCESSING_LOCK_NAME log = logging.getLogger(__name__) @@ -74,21 +69,6 @@ def run_impl( empty_upload=None, **kwargs, ): - redis_connection = get_redis_connection() - if self.has_upcoming_notifies_according_to_redis( - redis_connection, repoid, commitid - ): - log.info( - "Not notifying because there are seemingly other jobs being processed yet", - extra=dict(repoid=repoid, commitid=commitid), - ) - self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) - return { - "notified": False, - "notifications": None, - "reason": "has_other_notifies_coming", - } - lock_manager = LockManager( repoid=repoid, commitid=commitid, @@ -201,6 +181,8 @@ def run_impl_within_lock( "reason": "test_failures", } + # ASK: this installation/bot related logic impedes notification if unavailable, but it + # seems correct for it to be part of the notifier task, thoughts? try: installation_name_to_use = get_installation_name_for_owner_for_task( self.name, commit.repository.owner @@ -208,242 +190,133 @@ def run_impl_within_lock( repository_service = get_repo_provider_service_for_specific_commit( commit, installation_name_to_use ) + # The NoConfiguredAppsAvailable exception was moved to the notification orchestrator task + # as once it gets to the notifications it should be cleared of that exception. In theory + # the RepositoryWithoutValidBotError exception could also be moved except RepositoryWithoutValidBotError: - save_commit_error( - commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value - ) - log.warning( "Unable to start notifications because repo doesn't have a valid bot", extra=dict(repoid=repoid, commit=commitid), ) self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) return {"notified": False, "notifications": None, "reason": "no_valid_bot"} - except NoConfiguredAppsAvailable as exp: - if exp.rate_limited_count > 0: - # There's at least 1 app that we can use to communicate with GitHub, - # but this app happens to be rate limited now. We try again later. - # Min wait time of 1 minute - retry_delay_seconds = max(60, get_seconds_to_next_hour()) - log.warning( - "Unable to start notifications. Retrying again later.", - extra=dict( - repoid=repoid, - commit=commitid, - apps_available=exp.apps_count, - apps_rate_limited=exp.rate_limited_count, - apps_suspended=exp.suspended_count, - countdown_seconds=retry_delay_seconds, - ), - ) - return self._attempt_retry( - max_retries=10, - countdown=retry_delay_seconds, - current_yaml=current_yaml, - commit=commit, - **kwargs, - ) - # Maybe we have apps that are suspended. We can't communicate with github. - log.warning( - "We can't find an app to communicate with GitHub. Not notifying.", - extra=dict( - repoid=repoid, - commit=commitid, - apps_available=exp.apps_count, - apps_suspended=exp.suspended_count, - ), - ) - self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) - return { - "notified": False, - "notifications": None, - "reason": "no_valid_github_app_found", - } if current_yaml is None: current_yaml = async_to_sync(get_current_yaml)(commit, repository_service) else: current_yaml = UserYaml.from_dict(current_yaml) - try: - ci_results = self.fetch_and_update_whether_ci_passed( - repository_service, commit, current_yaml - ) - except TorngitClientError as ex: - log.info( - "Unable to fetch CI results due to a client problem. Not notifying user", - extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), - ) - self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) - return { - "notified": False, - "notifications": None, - "reason": "not_able_fetch_ci_result", - } - except TorngitServerFailureError: - log.info( - "Unable to fetch CI results due to server issues. Not notifying user", - extra=dict(repoid=commit.repoid, commit=commit.commitid), - ) - self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) - return { - "notified": False, - "notifications": None, - "reason": "server_issues_ci_result", - } - if self.should_wait_longer(current_yaml, commit, ci_results): - log.info( - "Not sending notifications yet because we are waiting for CI to finish", - extra=dict(repoid=commit.repoid, commit=commit.commitid), - ) - ghapp_default_installations = list( - filter( - lambda obj: obj.name == installation_name_to_use - and obj.is_configured(), - commit.repository.owner.github_app_installations or [], - ) - ) - rely_on_webhook_ghapp = ghapp_default_installations != [] and any( - obj.is_repo_covered_by_integration(commit.repository) - for obj in ghapp_default_installations - ) - rely_on_webhook_legacy = commit.repository.using_integration - if ( - rely_on_webhook_ghapp - or rely_on_webhook_legacy - or commit.repository.hookid - ): - # rely on the webhook, but still retry in case we miss the webhook - max_retries = 5 - countdown = (60 * 3) * 2**self.request.retries - else: - max_retries = 10 - countdown = 15 * 2**self.request.retries - return self._attempt_retry( - max_retries=max_retries, - countdown=countdown, - current_yaml=current_yaml, - commit=commit, - **kwargs, - ) - report_service = ReportService( current_yaml, gh_app_installation_name=installation_name_to_use ) head_report = report_service.get_existing_report_for_commit( commit, report_class=ReadOnlyReport ) - if self.should_send_notifications( - current_yaml, commit, ci_results, head_report - ): - enriched_pull = async_to_sync( - fetch_and_update_pull_request_information_from_commit - )(repository_service, commit, current_yaml) - if enriched_pull and enriched_pull.database_pull: - pull = enriched_pull.database_pull - base_commit = self.fetch_pull_request_base(pull) - else: - pull = None - base_commit = self.fetch_parent(commit) - - if ( - enriched_pull - and not self.send_notifications_if_commit_differs_from_pulls_head( - commit, enriched_pull, current_yaml - ) - and empty_upload is None - ): - log.info( - "Not sending notifications for commit when it differs from pull's most recent head", - extra=dict( - commit=commit.commitid, - repoid=commit.repoid, - current_yaml=current_yaml.to_dict(), - pull_head=enriched_pull.provider_pull["head"]["commitid"], - ), - ) - self.log_checkpoint(UploadFlow.NOTIF_STALE_HEAD) - return { - "notified": False, - "notifications": None, - "reason": "User doesnt want notifications warning them that current head differs from pull request most recent head.", - } - - if base_commit is not None: - base_report = report_service.get_existing_report_for_commit( - base_commit, report_class=ReadOnlyReport - ) - else: - base_report = None - if head_report is None and empty_upload is None: - self.log_checkpoint(UploadFlow.NOTIF_ERROR_NO_REPORT) - return { - "notified": False, - "notifications": None, - "reason": "no_head_report", - } - - if commit.repository.service == "gitlab": - gitlab_extra_shas_to_notify = self.get_gitlab_extra_shas_to_notify( - commit, repository_service - ) - else: - gitlab_extra_shas_to_notify = None + enriched_pull = async_to_sync( + fetch_and_update_pull_request_information_from_commit + )(repository_service, commit, current_yaml) + if enriched_pull and enriched_pull.database_pull: + pull = enriched_pull.database_pull + base_commit = self.fetch_pull_request_base(pull) + else: + pull = None + base_commit = self.fetch_parent(commit) + + if ( + enriched_pull + # TODO: I believe this should be something we extract to the notification orchestrator + # but it seems to intertwined with the logic here. The enriched_pull call would need to be + # cached before moving that to the orchestrator to avoid redundant API calls + and not self.send_notifications_if_commit_differs_from_pulls_head( + commit, enriched_pull, current_yaml + ) + and empty_upload is None + ): log.info( - "We are going to be sending notifications", + "Not sending notifications for commit when it differs from pull's most recent head", extra=dict( commit=commit.commitid, repoid=commit.repoid, current_yaml=current_yaml.to_dict(), + pull_head=enriched_pull.provider_pull["head"]["commitid"], ), ) - notifications = self.submit_third_party_notifications( - current_yaml, - base_commit, - commit, - base_report, - head_report, - enriched_pull, - repository_service, - empty_upload, - all_tests_passed=( - test_result_commit_report is not None - and test_result_commit_report.test_result_totals is not None - and test_result_commit_report.test_result_totals.error is None - and test_result_commit_report.test_result_totals.failed == 0 - ), - test_results_error=( - test_result_commit_report is not None - and test_result_commit_report.test_result_totals is not None - and test_result_commit_report.test_result_totals.error - ), - installation_name_to_use=installation_name_to_use, - gh_is_using_codecov_commenter=self.is_using_codecov_commenter( - repository_service - ), - gitlab_extra_shas_to_notify=gitlab_extra_shas_to_notify, - ) - self.log_checkpoint(UploadFlow.NOTIFIED) - log.info( - "Notifications done", - extra=dict( - notifications=notifications, - notification_count=len(notifications), - commit=commit.commitid, - repoid=commit.repoid, - pullid=pull.pullid if pull is not None else None, - ), + self.log_checkpoint(UploadFlow.NOTIF_STALE_HEAD) + return { + "notified": False, + "notifications": None, + "reason": "User doesnt want notifications warning them that current head differs from pull request most recent head.", + } + + if base_commit is not None: + base_report = report_service.get_existing_report_for_commit( + base_commit, report_class=ReadOnlyReport ) - db_session.commit() - return {"notified": True, "notifications": notifications} else: - log.info( - "Not sending notifications at all", - extra=dict(commit=commit.commitid, repoid=commit.repoid), + base_report = None + if head_report is None and empty_upload is None: + self.log_checkpoint(UploadFlow.NOTIF_ERROR_NO_REPORT) + return { + "notified": False, + "notifications": None, + "reason": "no_head_report", + } + + if commit.repository.service == "gitlab": + gitlab_extra_shas_to_notify = self.get_gitlab_extra_shas_to_notify( + commit, repository_service ) - self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) - return {"notified": False, "notifications": None} + else: + gitlab_extra_shas_to_notify = None + + log.info( + "We are going to be sending notifications", + extra=dict( + commit=commit.commitid, + repoid=commit.repoid, + current_yaml=current_yaml.to_dict(), + ), + ) + notifications = self.submit_third_party_notifications( + current_yaml, + base_commit, + commit, + base_report, + head_report, + enriched_pull, + repository_service, + empty_upload, + all_tests_passed=( + test_result_commit_report is not None + and test_result_commit_report.test_result_totals is not None + and test_result_commit_report.test_result_totals.error is None + and test_result_commit_report.test_result_totals.failed == 0 + ), + test_results_error=( + test_result_commit_report is not None + and test_result_commit_report.test_result_totals is not None + and test_result_commit_report.test_result_totals.error + ), + installation_name_to_use=installation_name_to_use, + gh_is_using_codecov_commenter=self.is_using_codecov_commenter( + repository_service + ), + gitlab_extra_shas_to_notify=gitlab_extra_shas_to_notify, + ) + self.log_checkpoint(UploadFlow.NOTIFIED) + log.info( + "Notifications done", + extra=dict( + notifications=notifications, + notification_count=len(notifications), + commit=commit.commitid, + repoid=commit.repoid, + pullid=pull.pullid if pull is not None else None, + ), + ) + db_session.commit() + return {"notified": True, "notifications": notifications} def is_using_codecov_commenter( self, repository_service: TorngitBaseAdapter @@ -461,27 +334,6 @@ def is_using_codecov_commenter( == commenter_bot_token ) - def has_upcoming_notifies_according_to_redis( - self, redis_connection: Redis, repoid: int, commitid: str - ) -> bool: - """Checks whether there are any jobs processing according to Redis right now and, - therefore, whether more up-to-date notifications will come after this anyway - - It's very important to have this code be conservative against saying - there are upcoming notifies already. The point of this code is to - avoid extra notifications for efficiency purposes, but it is better - to send extra notifications than to lack notifications - - Args: - redis_connection (Redis): The redis connection we check against - repoid (int): The repoid of the commit - commitid (str): The commitid of the commit - """ - upload_processing_lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) - if redis_connection.get(upload_processing_lock_name): - return True - return False - @sentry_sdk.trace def save_patch_totals(self, comparison: ComparisonProxy) -> None: """Saves patch coverage to the CompareCommit, if it exists. diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 91968617b..7f1404060 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -1,17 +1,13 @@ import logging import random -import re from datetime import datetime, timedelta, timezone -from enum import Enum import sentry_sdk from asgiref.sync import async_to_sync from redis.exceptions import LockError from redis.lock import Lock from shared.celery_config import ( - compute_comparison_task_name, - notify_task_name, - pulls_task_name, + notification_orchestrator_task_name, timeseries_save_commit_measurements_task_name, upload_finisher_task_name, ) @@ -20,40 +16,29 @@ from shared.yaml import UserYaml from app import celery_app -from celery_config import notify_error_task_name from database.enums import CommitErrorTypes -from database.models import Commit, Pull +from database.models import Commit from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME from helpers.cache import cache from helpers.checkpoint_logger.flows import UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task from helpers.save_commit_error import save_commit_error -from services.comparison import get_or_create_comparison from services.processing.intermediate import ( cleanup_intermediate_reports, load_intermediate_reports, ) from services.processing.merging import merge_reports, update_uploads -from services.processing.state import ProcessingState, should_trigger_postprocessing +from services.processing.state import ProcessingState from services.processing.types import ProcessingResult from services.redis import get_redis_connection from services.report import ReportService from services.repository import get_repo_provider_service -from services.yaml import read_yaml_field from tasks.base import BaseCodecovTask from tasks.upload_processor import MAX_RETRIES, UPLOAD_PROCESSING_LOCK_NAME log = logging.getLogger(__name__) -regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") - - -class ShouldCallNotifyResult(Enum): - DO_NOT_NOTIFY = "do_not_notify" - NOTIFY_ERROR = "notify_error" - NOTIFY = "notify" - class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): """This is the third task of the series of tasks designed to process an `upload` made @@ -64,8 +49,8 @@ class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): This task does the finishing steps after a group of uploads is processed The steps are: - - Schedule the set_pending task, depending on the case - - Schedule notification tasks, depending on the case + - Merging reports into one report for GCP and update Upload records with status + - Schedule notification orchestrator task - Invalidating whatever cache is done """ @@ -124,7 +109,6 @@ def run_impl( db_session.commit() state.mark_uploads_as_merged(upload_ids) - except LockError: max_retry = 200 * 3**self.request.retries retry_in = min(random.randint(max_retry // 2, max_retry), 60 * 60 * 5) @@ -136,21 +120,24 @@ def run_impl( cleanup_intermediate_reports(upload_ids) - if not should_trigger_postprocessing(state.get_upload_numbers()): - UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) - UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) - return - + # Call the notification orchestrator task lock_name = f"upload_finisher_lock_{repoid}_{commitid}" redis_connection = get_redis_connection() try: with redis_connection.lock(lock_name, timeout=60 * 5, blocking_timeout=5): - result = self.finish_reports_processing( - db_session, - commit, - commit_yaml, - processing_results, - report_code, + notification_orchestrator_kwargs = { + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "processing_results": processing_results, + "report_code": report_code, + } + notification_orchestrator_kwargs = UploadFlow.save_to_kwargs( + notification_orchestrator_kwargs + ) + # TODO: add log to add the notification orchestrator task + self.app.tasks[notification_orchestrator_task_name].apply_async( + kwargs=notification_orchestrator_kwargs ) self.app.tasks[ timeseries_save_commit_measurements_task_name @@ -166,182 +153,14 @@ def run_impl( db_session.commit() self.invalidate_caches(redis_connection, commit) + log.info("Finished upload_finisher task") - return result + UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) + return {"result": "finisher done"} except LockError: log.warning("Unable to acquire lock", extra=dict(lock_name=lock_name)) UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) - def finish_reports_processing( - self, - db_session, - commit: Commit, - commit_yaml: UserYaml, - processing_results: list[ProcessingResult], - report_code, - ): - log.debug("In finish_reports_processing for commit: %s" % commit) - commitid = commit.commitid - repoid = commit.repoid - - # always notify, let the notify handle if it should submit - notifications_called = False - if not regexp_ci_skip.search(commit.message or ""): - match self.should_call_notifications( - commit, commit_yaml, processing_results, report_code - ): - case ShouldCallNotifyResult.NOTIFY: - notifications_called = True - notify_kwargs = { - "repoid": repoid, - "commitid": commitid, - "current_yaml": commit_yaml.to_dict(), - } - notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) - task = self.app.tasks[notify_task_name].apply_async( - kwargs=notify_kwargs - ) - log.info( - "Scheduling notify task", - extra=dict( - repoid=repoid, - commit=commitid, - commit_yaml=commit_yaml.to_dict(), - processing_results=processing_results, - notify_task_id=task.id, - parent_task=self.request.parent_id, - ), - ) - if commit.pullid: - pull = ( - db_session.query(Pull) - .filter_by(repoid=commit.repoid, pullid=commit.pullid) - .first() - ) - if pull: - head = pull.get_head_commit() - if head is None or head.timestamp <= commit.timestamp: - pull.head = commit.commitid - if pull.head == commit.commitid: - db_session.commit() - self.app.tasks[pulls_task_name].apply_async( - kwargs=dict( - repoid=repoid, - pullid=pull.pullid, - should_send_notifications=False, - ) - ) - compared_to = pull.get_comparedto_commit() - if compared_to: - comparison = get_or_create_comparison( - db_session, compared_to, commit - ) - db_session.commit() - self.app.tasks[ - compute_comparison_task_name - ].apply_async( - kwargs=dict(comparison_id=comparison.id) - ) - case ShouldCallNotifyResult.DO_NOT_NOTIFY: - notifications_called = False - log.info( - "Skipping notify task", - extra=dict( - repoid=repoid, - commit=commitid, - commit_yaml=commit_yaml.to_dict(), - processing_results=processing_results, - parent_task=self.request.parent_id, - ), - ) - case ShouldCallNotifyResult.NOTIFY_ERROR: - notifications_called = False - notify_error_kwargs = { - "repoid": repoid, - "commitid": commitid, - "current_yaml": commit_yaml.to_dict(), - } - notify_error_kwargs = UploadFlow.save_to_kwargs(notify_error_kwargs) - task = self.app.tasks[notify_error_task_name].apply_async( - kwargs=notify_error_kwargs - ) - else: - commit.state = "skipped" - - UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) - if not notifications_called: - UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) - - return {"notifications_called": notifications_called} - - def should_call_notifications( - self, - commit: Commit, - commit_yaml: UserYaml, - processing_results: list[ProcessingResult], - report_code, - ) -> ShouldCallNotifyResult: - extra_dict = { - "repoid": commit.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml, - "processing_results": processing_results, - "report_code": report_code, - "parent_task": self.request.parent_id, - } - - manual_trigger = read_yaml_field( - commit_yaml, ("codecov", "notify", "manual_trigger") - ) - if manual_trigger: - log.info( - "Not scheduling notify because manual trigger is used", - extra=extra_dict, - ) - return ShouldCallNotifyResult.DO_NOT_NOTIFY - # Notifications should be off in case of local uploads, and report code wouldn't be null in that case - if report_code is not None: - log.info( - "Not scheduling notify because it's a local upload", - extra=extra_dict, - ) - return ShouldCallNotifyResult.DO_NOT_NOTIFY - - after_n_builds = ( - read_yaml_field(commit_yaml, ("codecov", "notify", "after_n_builds")) or 0 - ) - if after_n_builds > 0: - report = ReportService(commit_yaml).get_existing_report_for_commit(commit) - number_sessions = len(report.sessions) if report is not None else 0 - if after_n_builds > number_sessions: - log.info( - "Not scheduling notify because `after_n_builds` is %s and we only found %s builds", - after_n_builds, - number_sessions, - extra=extra_dict, - ) - return ShouldCallNotifyResult.DO_NOT_NOTIFY - - processing_successses = [x["successful"] for x in processing_results] - - if read_yaml_field( - commit_yaml, - ("codecov", "notify", "notify_error"), - _else=False, - ): - if len(processing_successses) == 0 or not all(processing_successses): - log.info( - "Not scheduling notify because there is a non-successful processing result", - extra=extra_dict, - ) - - return ShouldCallNotifyResult.NOTIFY_ERROR - else: - if not any(processing_successses): - return ShouldCallNotifyResult.DO_NOT_NOTIFY - - return ShouldCallNotifyResult.NOTIFY - def invalidate_caches(self, redis_connection, commit: Commit): redis_connection.delete("cache/{}/tree/{}".format(commit.repoid, commit.branch)) redis_connection.delete( @@ -355,10 +174,6 @@ def invalidate_caches(self, redis_connection, commit: Commit): redis_connection.hdel("badge", ("%s:" % key).lower()) -RegisteredUploadTask = celery_app.register_task(UploadFinisherTask()) -upload_finisher_task = celery_app.tasks[RegisteredUploadTask.name] - - def get_report_lock(repoid: int, commitid: str, hard_time_limit: int) -> Lock: lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) redis_connection = get_redis_connection() @@ -433,10 +248,13 @@ def load_commit_diff(commit: Commit, task_name: str | None = None) -> dict | Non commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value, ) - log.warning( "Could not apply diff to report because there is no valid bot found for that repo", exc_info=True, ) return None + + +RegisteredUploadTask = celery_app.register_task(UploadFinisherTask()) +upload_finisher_task = celery_app.tasks[RegisteredUploadTask.name]