From 092a0b68fe1d50dc53dab9d2c5c4dcde2850535c Mon Sep 17 00:00:00 2001 From: Adrian Date: Thu, 2 Jan 2025 12:57:20 -0600 Subject: [PATCH 1/3] WIP --- services/lock_manager.py | 1 + tasks/__init__.py | 1 + tasks/notification_orchestrator.py | 1244 ++++++++++++++++++++++++++++ tasks/notify.py | 397 +++++---- tasks/upload_finisher.py | 230 +---- 5 files changed, 1456 insertions(+), 417 deletions(-) create mode 100644 tasks/notification_orchestrator.py diff --git a/services/lock_manager.py b/services/lock_manager.py index 0cb691cf8..618443edd 100644 --- a/services/lock_manager.py +++ b/services/lock_manager.py @@ -17,6 +17,7 @@ class LockType(Enum): BUNDLE_ANALYSIS_PROCESSING = "bundle_analysis_processing" BUNDLE_ANALYSIS_NOTIFY = "bundle_analysis_notify" NOTIFICATION = "notify" + NOTIFICATION_ORCHESTRATOR = "notification_orchestrator" # TODO: port existing task locking to use `LockManager` diff --git a/tasks/__init__.py b/tasks/__init__.py index 7c9c39823..b9c0761b1 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -32,6 +32,7 @@ from tasks.manual_trigger import manual_trigger_task from tasks.new_user_activated import new_user_activated_task from tasks.notify import notify_task +from tasks.notification_orchestrator import notification_orchestrator_task from tasks.notify_error import notify_error_task from tasks.plan_manager_task import daily_plan_manager_task_name from tasks.preprocess_upload import preprocess_upload_task diff --git a/tasks/notification_orchestrator.py b/tasks/notification_orchestrator.py new file mode 100644 index 000000000..8aba15d2a --- /dev/null +++ b/tasks/notification_orchestrator.py @@ -0,0 +1,1244 @@ +import logging +from typing import Optional + +import sentry_sdk +import re +from asgiref.sync import async_to_sync +from celery.exceptions import MaxRetriesExceededError, SoftTimeLimitExceeded +from celery_config import notify_error_task_name +from shared.bots.github_apps import ( + get_github_app_token, + get_specific_github_app_details, +) +from shared.celery_config import ( + activate_account_user_task_name, + new_user_activated_task_name, + notification_orchestrator_task_name, + status_set_error_task_name, + compute_comparison_task_name, + notify_task_name, + pulls_task_name, +) +from shared.config import get_config +from shared.django_apps.codecov_auth.models import Service +from shared.reports.readonly import ReadOnlyReport +from shared.torngit.base import TokenType, TorngitBaseAdapter +from shared.torngit.exceptions import TorngitClientError, TorngitServerFailureError +from shared.typings.torngit import OwnerInfo, RepoInfo, TorngitInstanceData +from shared.yaml import UserYaml +from sqlalchemy import and_ +from sqlalchemy.orm.session import Session +from enum import Enum + +from app import celery_app +from database.enums import CommitErrorTypes, Decoration, NotificationState, ReportType +from database.models import Commit, Pull +from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME, CompareCommit +from helpers.checkpoint_logger.flows import UploadFlow +from helpers.clock import get_seconds_to_next_hour +from helpers.comparison import minimal_totals +from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError +from helpers.github_installation import get_installation_name_for_owner_for_task +from helpers.save_commit_error import save_commit_error +from services.activation import activate_user +from services.commit_status import RepositoryCIFilter +from services.comparison import ( + ComparisonContext, + ComparisonProxy, + get_or_create_comparison, +) +from services.comparison.types import Comparison, FullCommit +from services.decoration import determine_decoration_details +from services.github import get_github_app_for_commit, set_github_app_for_commit +from services.lock_manager import LockManager, LockRetry, LockType +from services.notification import NotificationService +from services.redis import Redis, get_redis_connection +from services.report import ReportService +from services.processing.types import ProcessingResult +from services.processing.state import ProcessingState, should_trigger_postprocessing +from services.repository import ( + EnrichedPull, + _get_repo_provider_service_instance, + fetch_and_update_pull_request_information_from_commit, + get_repo_provider_service, +) +from services.yaml import get_current_yaml, read_yaml_field +from tasks.base import BaseCodecovTask +from tasks.upload_processor import UPLOAD_PROCESSING_LOCK_NAME + +log = logging.getLogger(__name__) + +# This should decide if it should notify based on +# Retry if someone tries to access lock and it's busy +# Processing state from other uploads with the same repo/commit id combo +# Business logic +# Notify side-effects + # Call timeseries + # Call pull sync task + +# This needs to +# Have the same signature as the notification's task to keep notifications the same + +def has_upcoming_notifies_according_to_redis( + self, redis_connection: Redis, repoid: int, commitid: str +) -> bool: + """Checks whether there are any jobs processing according to Redis right now and, + therefore, whether more up-to-date notifications will come after this anyway + + It's very important to have this code be conservative against saying + there are upcoming notifies already. The point of this code is to + avoid extra notifications for efficiency purposes, but it is better + to send extra notifications than to lack notifications + + Args: + redis_connection (Redis): The redis connection we check against + repoid (int): The repoid of the commit + commitid (str): The commitid of the commit + """ + upload_processing_lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) + if redis_connection.get(upload_processing_lock_name): + return True + return False + + +regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") + +class ShouldCallNotifyResult(Enum): + WAIT_TO_NOTIFY = "wait_to_notify" + DO_NOT_NOTIFY = "do_not_notify" + NOTIFY_ERROR = "notify_error" + NOTIFY = "notify" + +# TODO: Ask: does this need a throws = (SoftTimeLimitExceeded,)? +class NotificationOrchestratorTask(BaseCodecovTask, name=notification_orchestrator_task_name): + def run_impl( + self, + db_session: Session, + *args, + repoid: int, + commitid: str, + commit_yaml: dict, + processing_results: list[ProcessingResult], + report_code: str | None = None, + empty_upload=None, + **kwargs, + ): + print("Helllllooooooo", repoid) + print("Helllllooooooo", commitid) + print("Helllllooooooo", processing_results) + print("Helllllooooooo", args) + print("Helllllooooooo", commit_yaml) + print("Helllllooooooo", empty_upload) + print("Helllllooooooo", kwargs) + + lock_manager = LockManager( + repoid=repoid, + commitid=commitid, + # Currently hardcoded to coverage, should be variable to the report type + report_type=ReportType.COVERAGE, + # Unsure if we need a specific timeout hard timeout for this task + lock_timeout=max(80, self.hard_time_limit_task), + ) + try: + lock_acquired = False + with lock_manager.locked( + lock_type=LockType.NOTIFICATION_ORCHESTRATOR, retry_num=self.request.retries + ): + lock_acquired = True + return self.run_impl_within_lock( + db_session, + repoid=repoid, + commitid=commitid, + commit_yaml=commit_yaml, + processing_results=processing_results, + report_code=report_code, + empty_upload=empty_upload, + **kwargs, + ) + # TODO: What should happen when there's a lock error? I think this should change to some retrying mechanism + # Should it be a LockRetry or a LockError? + except LockRetry as err: + ( + log.info( + "Not notifying because there is another notification already happening", + extra=dict( + repoid=repoid, + commitid=commitid, + error_type=type(err), + lock_acquired=lock_acquired, + ), + ), + ) + self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) + return { + "notified": False, + "notifications": None, + "reason": "unobtainable_lock", + } + + + + + # redis_connection = get_redis_connection() + # if self.has_upcoming_notifies_according_to_redis( + # redis_connection, repoid, commitid + # ): + # log.info( + # "Not notifying because there are seemingly other jobs being processed yet", + # extra=dict(repoid=repoid, commitid=commitid), + # ) + # self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) + # return { + # "notified": False, + # "notifications": None, + # "reason": "has_other_notifies_coming", + # } + + + def run_impl_within_lock( + self, + db_session: Session, + *args, + repoid: int, + commitid: str, + commit_yaml: dict, + processing_results: list[ProcessingResult], + report_code: str | None = None, + empty_upload=None, + **kwargs, + ): + # Should not notify based on yaml settings + # Should not notify based on type of upload (empty upload) + # Should not notify based on processing results + # Upload states + # Should not notify based on pending notifications + # Should not notify based on business logic + + # States + ### metrics log, log.info/warning. If success, side-effects + # Should not notify + # Should attempt to notify later + # Should notify + + ##### Start + redis_connection = get_redis_connection() + repoid = int(repoid) + commit_yaml = UserYaml(commit_yaml) + + commit = ( + db_session.query(Commit) + .filter(Commit.repoid == repoid, Commit.commitid == commitid) + .first() + ) + assert commit, "Commit not found in database." + repository = commit.repository + + state = ProcessingState(repoid, commitid) + processing_successes = [x["successful"] for x in processing_results] + notifications_called = False + + extra_dict = { + "repoid": commit.repoid, + "commitid": commit.commitid, + "commit_yaml": commit_yaml, + "processing_results": processing_results, + "report_code": report_code, + "parent_task": self.request.parent_id, + } + + ##### Checks other uploads and does things based on them + # Checks if there is at least a success + if not any(processing_successes): + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + # Determine if there should be anything post_processing + if not should_trigger_postprocessing(state.get_upload_numbers()): + UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) + return + + # Determines if there are other uploads processing or incoming + if has_upcoming_notifies_according_to_redis( + redis_connection, repoid, commitid + ): + log.info( + "Not notifying because there are seemingly other jobs being processed yet", + extra=dict(repoid=repoid, commitid=commitid), + ) + self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) + return { + "notified": False, + "notifications": None, + "reason": "has_other_notifies_coming", + } + + ##### Yaml settings checks + # Checks for manual trigger + manual_trigger = read_yaml_field( + commit_yaml, ("codecov", "notify", "manual_trigger") + ) + if manual_trigger: + log.info( + "Not scheduling notify because manual trigger is used", + extra=extra_dict, + ) + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + # Checks for after_n_builds + after_n_builds = ( + read_yaml_field(commit_yaml, ("codecov", "notify", "after_n_builds")) or 0 + ) + if after_n_builds > 0: + report = ReportService(commit_yaml).get_existing_report_for_commit(commit) + number_sessions = len(report.sessions) if report is not None else 0 + if after_n_builds > number_sessions: + log.info( + "Not scheduling notify because `after_n_builds` is %s and we only found %s builds", + after_n_builds, + number_sessions, + extra=extra_dict, + ) + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + # Checks for notify_error + notify_error = read_yaml_field( + commit_yaml, + ("codecov", "notify", "notify_error"), + _else=False, + ) + if notify_error and (len(processing_successes) == 0 or not all(processing_successes)): + log.info( + "Not scheduling notify because there is a non-successful processing result", + extra=extra_dict, + ) + return ShouldCallNotifyResult.NOTIFY_ERROR + + ### Checks related to repository_service and ci_results + # ASK: this installation/bot related logic impedes notification if unavailable, but it + # seems correct for it to be part of the notifier task, thoughts? + try: + installation_name_to_use = get_installation_name_for_owner_for_task( + self.name, commit.repository.owner + ) + # This is technically a different repository_service method than the one used in notifications. I + # replaced it because the other method is a) deeply intertwined with the notify file and b) it seems + # to be extra specific just for the notification aspect of it, so this should suffice for other checks. + # Please look corroborate this isn't a red flag. + repository_service = get_repo_provider_service(repository) + except RepositoryWithoutValidBotError: + save_commit_error( + commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value + ) + + log.warning( + "Unable to start notifications because repo doesn't have a valid bot", + extra=dict(repoid=repoid, commit=commitid), + ) + self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) + return {"notified": False, "notifications": None, "reason": "no_valid_bot"} + except NoConfiguredAppsAvailable as exp: + if exp.rate_limited_count > 0: + # There's at least 1 app that we can use to communicate with GitHub, + # but this app happens to be rate limited now. We try again later. + # Min wait time of 1 minute + retry_delay_seconds = max(60, get_seconds_to_next_hour()) + log.warning( + "Unable to start notifications. Retrying again later.", + extra=dict( + repoid=repoid, + commit=commitid, + apps_available=exp.apps_count, + apps_rate_limited=exp.rate_limited_count, + apps_suspended=exp.suspended_count, + countdown_seconds=retry_delay_seconds, + ), + ) + return self._attempt_retry( + max_retries=10, + countdown=retry_delay_seconds, + current_yaml=commit_yaml, + commit=commit, + **kwargs, + ) + # Maybe we have apps that are suspended. We can't communicate with github. + log.warning( + "We can't find an app to communicate with GitHub. Not notifying.", + extra=dict( + repoid=repoid, + commit=commitid, + apps_available=exp.apps_count, + apps_suspended=exp.suspended_count, + ), + ) + self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) + return { + "notified": False, + "notifications": None, + "reason": "no_valid_github_app_found", + } + + try: + ci_results = self.fetch_and_update_whether_ci_passed( + repository_service, commit, commit_yaml + ) + except TorngitClientError as ex: + log.info( + "Unable to fetch CI results due to a client problem. Not notifying user", + extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), + ) + self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) + return { + "notified": False, + "notifications": None, + "reason": "not_able_fetch_ci_result", + } + except TorngitServerFailureError: + log.info( + "Unable to fetch CI results due to server issues. Not notifying user", + extra=dict(repoid=commit.repoid, commit=commit.commitid), + ) + self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) + return { + "notified": False, + "notifications": None, + "reason": "server_issues_ci_result", + } + + # Check for wait_for_ci based on the CI results and reattempt if true + # should_wait_longer + wait_for_ci = read_yaml_field(commit_yaml, ("codecov", "notify", "wait_for_ci"), True) + if wait_for_ci and ci_results is None: + log.info( + "Not sending notifications yet because we are waiting for CI to finish", + extra=dict(repoid=commit.repoid, commit=commit.commitid), + ) + ghapp_default_installations = list( + filter( + lambda obj: obj.name == installation_name_to_use + and obj.is_configured(), + commit.repository.owner.github_app_installations or [], + ) + ) + rely_on_webhook_ghapp = ghapp_default_installations != [] and any( + obj.is_repo_covered_by_integration(commit.repository) + for obj in ghapp_default_installations + ) + rely_on_webhook_legacy = commit.repository.using_integration + if ( + rely_on_webhook_ghapp + or rely_on_webhook_legacy + or commit.repository.hookid + ): + # rely on the webhook, but still retry in case we miss the webhook + max_retries = 5 + countdown = (60 * 3) * 2**self.request.retries + else: + max_retries = 10 + countdown = 15 * 2**self.request.retries + return self._attempt_retry( + max_retries=max_retries, + countdown=countdown, + current_yaml=commit_yaml, + commit=commit, + **kwargs, + ) + + # should_send_notifications require_ci_to_pass + require_ci_to_pass = read_yaml_field(commit_yaml, ("codecov", "require_ci_to_pass"), True) + if require_ci_to_pass and ci_results is False: + self.app.tasks[status_set_error_task_name].apply_async( + args=None, + kwargs=dict( + repoid=commit.repoid, commitid=commit.commitid, message="CI failed." + ), + ) + log.info( + "Not sending notifications because CI failed", + extra=dict(repoid=commit.repoid, commit=commit.commitid), + ) + return False + + ##### Business logic + # Notifications should be off in case of local uploads, and report code wouldn't be null in that case + if report_code is not None: + log.info( + "Not scheduling notify because it's a local upload", + extra=extra_dict, + ) + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + # Some check on CI skipping from some regex? Idk what this is + if regexp_ci_skip.search(commit.message or ""): + commit.state = "skipped" + + + # If it got here, it should notify + notifications_called = True + notify_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + } + notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) + task = self.app.tasks[notify_task_name].apply_async( + kwargs=notify_kwargs + ) + log.info( + "Scheduling notify task", + extra=dict( + repoid=repoid, + commit=commitid, + commit_yaml=commit_yaml.to_dict(), + processing_results=processing_results, + notify_task_id=task.id, + parent_task=self.request.parent_id, + ), + ) + if commit.pullid: + pull = ( + db_session.query(Pull) + .filter_by(repoid=commit.repoid, pullid=commit.pullid) + .first() + ) + if pull: + head = pull.get_head_commit() + if head is None or head.timestamp <= commit.timestamp: + pull.head = commit.commitid + if pull.head == commit.commitid: + db_session.commit() + self.app.tasks[pulls_task_name].apply_async( + kwargs=dict( + repoid=repoid, + pullid=pull.pullid, + should_send_notifications=False, + ) + ) + compared_to = pull.get_comparedto_commit() + if compared_to: + comparison = get_or_create_comparison( + db_session, compared_to, commit + ) + db_session.commit() + self.app.tasks[ + compute_comparison_task_name + ].apply_async( + kwargs=dict(comparison_id=comparison.id) + ) + + + # # ASK: this installation/bot related logic impedes notification if unavailable, but it + # # seems correct for it to be part of the notifier task, thoughts? + # try: + # installation_name_to_use = get_installation_name_for_owner_for_task( + # self.name, commit.repository.owner + # ) + # repository_service = get_repo_provider_service_for_specific_commit( + # commit, installation_name_to_use + # ) + # except RepositoryWithoutValidBotError: + # save_commit_error( + # commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value + # ) + + # log.warning( + # "Unable to start notifications because repo doesn't have a valid bot", + # extra=dict(repoid=repoid, commit=commitid), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) + # return {"notified": False, "notifications": None, "reason": "no_valid_bot"} + # except NoConfiguredAppsAvailable as exp: + # if exp.rate_limited_count > 0: + # # There's at least 1 app that we can use to communicate with GitHub, + # # but this app happens to be rate limited now. We try again later. + # # Min wait time of 1 minute + # retry_delay_seconds = max(60, get_seconds_to_next_hour()) + # log.warning( + # "Unable to start notifications. Retrying again later.", + # extra=dict( + # repoid=repoid, + # commit=commitid, + # apps_available=exp.apps_count, + # apps_rate_limited=exp.rate_limited_count, + # apps_suspended=exp.suspended_count, + # countdown_seconds=retry_delay_seconds, + # ), + # ) + # return self._attempt_retry( + # max_retries=10, + # countdown=retry_delay_seconds, + # current_yaml=current_yaml, + # commit=commit, + # **kwargs, + # ) + # # Maybe we have apps that are suspended. We can't communicate with github. + # log.warning( + # "We can't find an app to communicate with GitHub. Not notifying.", + # extra=dict( + # repoid=repoid, + # commit=commitid, + # apps_available=exp.apps_count, + # apps_suspended=exp.suspended_count, + # ), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) + # return { + # "notified": False, + # "notifications": None, + # "reason": "no_valid_github_app_found", + # } + + # if current_yaml is None: + # current_yaml = async_to_sync(get_current_yaml)(commit, repository_service) + # else: + # current_yaml = UserYaml.from_dict(current_yaml) + + # try: + # ci_results = self.fetch_and_update_whether_ci_passed( + # repository_service, commit, current_yaml + # ) + # except TorngitClientError as ex: + # log.info( + # "Unable to fetch CI results due to a client problem. Not notifying user", + # extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) + # return { + # "notified": False, + # "notifications": None, + # "reason": "not_able_fetch_ci_result", + # } + # except TorngitServerFailureError: + # log.info( + # "Unable to fetch CI results due to server issues. Not notifying user", + # extra=dict(repoid=commit.repoid, commit=commit.commitid), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) + # return { + # "notified": False, + # "notifications": None, + # "reason": "server_issues_ci_result", + # } + + # Should not notify based on type of upload (empty upload) + # Should not notify based on pending notifications + + + + + if not notifications_called: + UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) + + + + + + ############## From processing + + log.debug("In finish_reports_processing for commit: %s" % commit) + commitid = commit.commitid + repoid = commit.repoid + + # always notify, let the notify handle if it should submit + notifications_called = False + if not regexp_ci_skip.search(commit.message or ""): + match self.should_call_notifications( + commit, commit_yaml, processing_results, report_code + ): + case ShouldCallNotifyResult.NOTIFY: + notifications_called = True + notify_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + } + notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) + task = self.app.tasks[notify_task_name].apply_async( + kwargs=notify_kwargs + ) + log.info( + "Scheduling notify task", + extra=dict( + repoid=repoid, + commit=commitid, + commit_yaml=commit_yaml.to_dict(), + processing_results=processing_results, + notify_task_id=task.id, + parent_task=self.request.parent_id, + ), + ) + if commit.pullid: + pull = ( + db_session.query(Pull) + .filter_by(repoid=commit.repoid, pullid=commit.pullid) + .first() + ) + if pull: + head = pull.get_head_commit() + if head is None or head.timestamp <= commit.timestamp: + pull.head = commit.commitid + if pull.head == commit.commitid: + db_session.commit() + self.app.tasks[pulls_task_name].apply_async( + kwargs=dict( + repoid=repoid, + pullid=pull.pullid, + should_send_notifications=False, + ) + ) + compared_to = pull.get_comparedto_commit() + if compared_to: + comparison = get_or_create_comparison( + db_session, compared_to, commit + ) + db_session.commit() + self.app.tasks[ + compute_comparison_task_name + ].apply_async( + kwargs=dict(comparison_id=comparison.id) + ) + case ShouldCallNotifyResult.DO_NOT_NOTIFY: + notifications_called = False + log.info( + "Skipping notify task", + extra=dict( + repoid=repoid, + commit=commitid, + commit_yaml=commit_yaml.to_dict(), + processing_results=processing_results, + parent_task=self.request.parent_id, + ), + ) + case ShouldCallNotifyResult.NOTIFY_ERROR: + notifications_called = False + notify_error_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + } + notify_error_kwargs = UploadFlow.save_to_kwargs(notify_error_kwargs) + task = self.app.tasks[notify_error_task_name].apply_async( + kwargs=notify_error_kwargs + ) + else: + commit.state = "skipped" + + UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) + if not notifications_called: + UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) + + return {"notifications_called": notifications_called} + # except LockError: + # log.warning("Unable to acquire lock", extra=dict(lock_name=lock_name)) + # UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) + + ############## From processing + + ######### Possibly move this logic to the orchestrator task as a requirement + try: + installation_name_to_use = get_installation_name_for_owner_for_task( + self.name, commit.repository.owner + ) + repository_service = get_repo_provider_service_for_specific_commit( + commit, installation_name_to_use + ) + except RepositoryWithoutValidBotError: + save_commit_error( + commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value + ) + + log.warning( + "Unable to start notifications because repo doesn't have a valid bot", + extra=dict(repoid=repoid, commit=commitid), + ) + self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) + return {"notified": False, "notifications": None, "reason": "no_valid_bot"} + except NoConfiguredAppsAvailable as exp: + if exp.rate_limited_count > 0: + # There's at least 1 app that we can use to communicate with GitHub, + # but this app happens to be rate limited now. We try again later. + # Min wait time of 1 minute + retry_delay_seconds = max(60, get_seconds_to_next_hour()) + log.warning( + "Unable to start notifications. Retrying again later.", + extra=dict( + repoid=repoid, + commit=commitid, + apps_available=exp.apps_count, + apps_rate_limited=exp.rate_limited_count, + apps_suspended=exp.suspended_count, + countdown_seconds=retry_delay_seconds, + ), + ) + return self._attempt_retry( + max_retries=10, + countdown=retry_delay_seconds, + current_yaml=current_yaml, + commit=commit, + **kwargs, + ) + # Maybe we have apps that are suspended. We can't communicate with github. + log.warning( + "We can't find an app to communicate with GitHub. Not notifying.", + extra=dict( + repoid=repoid, + commit=commitid, + apps_available=exp.apps_count, + apps_suspended=exp.suspended_count, + ), + ) + self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) + return { + "notified": False, + "notifications": None, + "reason": "no_valid_github_app_found", + } + + if current_yaml is None: + current_yaml = async_to_sync(get_current_yaml)(commit, repository_service) + else: + current_yaml = UserYaml.from_dict(current_yaml) + + try: + ci_results = self.fetch_and_update_whether_ci_passed( + repository_service, commit, current_yaml + ) + except TorngitClientError as ex: + log.info( + "Unable to fetch CI results due to a client problem. Not notifying user", + extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), + ) + self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) + return { + "notified": False, + "notifications": None, + "reason": "not_able_fetch_ci_result", + } + except TorngitServerFailureError: + log.info( + "Unable to fetch CI results due to server issues. Not notifying user", + extra=dict(repoid=commit.repoid, commit=commit.commitid), + ) + self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) + return { + "notified": False, + "notifications": None, + "reason": "server_issues_ci_result", + } + if self.should_wait_longer(current_yaml, commit, ci_results): + log.info( + "Not sending notifications yet because we are waiting for CI to finish", + extra=dict(repoid=commit.repoid, commit=commit.commitid), + ) + ghapp_default_installations = list( + filter( + lambda obj: obj.name == installation_name_to_use + and obj.is_configured(), + commit.repository.owner.github_app_installations or [], + ) + ) + rely_on_webhook_ghapp = ghapp_default_installations != [] and any( + obj.is_repo_covered_by_integration(commit.repository) + for obj in ghapp_default_installations + ) + rely_on_webhook_legacy = commit.repository.using_integration + if ( + rely_on_webhook_ghapp + or rely_on_webhook_legacy + or commit.repository.hookid + ): + # rely on the webhook, but still retry in case we miss the webhook + max_retries = 5 + countdown = (60 * 3) * 2**self.request.retries + else: + max_retries = 10 + countdown = 15 * 2**self.request.retries + return self._attempt_retry( + max_retries=max_retries, + countdown=countdown, + current_yaml=current_yaml, + commit=commit, + **kwargs, + ) + + report_service = ReportService( + current_yaml, gh_app_installation_name=installation_name_to_use + ) + head_report = report_service.get_existing_report_for_commit( + commit, report_class=ReadOnlyReport + ) + if self.should_send_notifications( + current_yaml, commit, ci_results, head_report + ): + enriched_pull = async_to_sync( + fetch_and_update_pull_request_information_from_commit + )(repository_service, commit, current_yaml) + if enriched_pull and enriched_pull.database_pull: + pull = enriched_pull.database_pull + base_commit = self.fetch_pull_request_base(pull) + else: + pull = None + base_commit = self.fetch_parent(commit) + + if ( + enriched_pull + and not self.send_notifications_if_commit_differs_from_pulls_head( + commit, enriched_pull, current_yaml + ) + and empty_upload is None + ): + log.info( + "Not sending notifications for commit when it differs from pull's most recent head", + extra=dict( + commit=commit.commitid, + repoid=commit.repoid, + current_yaml=current_yaml.to_dict(), + pull_head=enriched_pull.provider_pull["head"]["commitid"], + ), + ) + self.log_checkpoint(UploadFlow.NOTIF_STALE_HEAD) + return { + "notified": False, + "notifications": None, + "reason": "User doesnt want notifications warning them that current head differs from pull request most recent head.", + } + + if base_commit is not None: + base_report = report_service.get_existing_report_for_commit( + base_commit, report_class=ReadOnlyReport + ) + else: + base_report = None + if head_report is None and empty_upload is None: + self.log_checkpoint(UploadFlow.NOTIF_ERROR_NO_REPORT) + return { + "notified": False, + "notifications": None, + "reason": "no_head_report", + } + + ######### Possibly move this logic to the orchestrator task as a requirement + + + + # log.info("Starting notifications", extra=dict(commit=commitid, repoid=repoid)) + # commits_query = db_session.query(Commit).filter( + # Commit.repoid == repoid, Commit.commitid == commitid + # ) + # commit: Commit = commits_query.first() + # assert commit, "Commit not found in database." + + # test_result_commit_report = commit.commit_report(ReportType.TEST_RESULTS) + # if ( + # test_result_commit_report is not None + # and test_result_commit_report.test_result_totals is not None + # and not test_result_commit_report.test_result_totals.error + # and test_result_commit_report.test_result_totals.failed > 0 + # ): + # return { + # "notify_attempted": False, + # "notifications": None, + # "reason": "test_failures", + # } + + # try: + # installation_name_to_use = get_installation_name_for_owner_for_task( + # self.name, commit.repository.owner + # ) + # repository_service = get_repo_provider_service_for_specific_commit( + # commit, installation_name_to_use + # ) + # except RepositoryWithoutValidBotError: + # save_commit_error( + # commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value + # ) + + # log.warning( + # "Unable to start notifications because repo doesn't have a valid bot", + # extra=dict(repoid=repoid, commit=commitid), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) + # return {"notified": False, "notifications": None, "reason": "no_valid_bot"} + # except NoConfiguredAppsAvailable as exp: + # if exp.rate_limited_count > 0: + # # There's at least 1 app that we can use to communicate with GitHub, + # # but this app happens to be rate limited now. We try again later. + # # Min wait time of 1 minute + # retry_delay_seconds = max(60, get_seconds_to_next_hour()) + # log.warning( + # "Unable to start notifications. Retrying again later.", + # extra=dict( + # repoid=repoid, + # commit=commitid, + # apps_available=exp.apps_count, + # apps_rate_limited=exp.rate_limited_count, + # apps_suspended=exp.suspended_count, + # countdown_seconds=retry_delay_seconds, + # ), + # ) + # return self._attempt_retry( + # max_retries=10, + # countdown=retry_delay_seconds, + # current_yaml=current_yaml, + # commit=commit, + # **kwargs, + # ) + # # Maybe we have apps that are suspended. We can't communicate with github. + # log.warning( + # "We can't find an app to communicate with GitHub. Not notifying.", + # extra=dict( + # repoid=repoid, + # commit=commitid, + # apps_available=exp.apps_count, + # apps_suspended=exp.suspended_count, + # ), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) + # return { + # "notified": False, + # "notifications": None, + # "reason": "no_valid_github_app_found", + # } + + # if current_yaml is None: + # current_yaml = async_to_sync(get_current_yaml)(commit, repository_service) + # else: + # current_yaml = UserYaml.from_dict(current_yaml) + + # try: + # ci_results = self.fetch_and_update_whether_ci_passed( + # repository_service, commit, current_yaml + # ) + # except TorngitClientError as ex: + # log.info( + # "Unable to fetch CI results due to a client problem. Not notifying user", + # extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) + # return { + # "notified": False, + # "notifications": None, + # "reason": "not_able_fetch_ci_result", + # } + # except TorngitServerFailureError: + # log.info( + # "Unable to fetch CI results due to server issues. Not notifying user", + # extra=dict(repoid=commit.repoid, commit=commit.commitid), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) + # return { + # "notified": False, + # "notifications": None, + # "reason": "server_issues_ci_result", + # } + # if self.should_wait_longer(current_yaml, commit, ci_results): + # log.info( + # "Not sending notifications yet because we are waiting for CI to finish", + # extra=dict(repoid=commit.repoid, commit=commit.commitid), + # ) + # ghapp_default_installations = list( + # filter( + # lambda obj: obj.name == installation_name_to_use + # and obj.is_configured(), + # commit.repository.owner.github_app_installations or [], + # ) + # ) + # rely_on_webhook_ghapp = ghapp_default_installations != [] and any( + # obj.is_repo_covered_by_integration(commit.repository) + # for obj in ghapp_default_installations + # ) + # rely_on_webhook_legacy = commit.repository.using_integration + # if ( + # rely_on_webhook_ghapp + # or rely_on_webhook_legacy + # or commit.repository.hookid + # ): + # # rely on the webhook, but still retry in case we miss the webhook + # max_retries = 5 + # countdown = (60 * 3) * 2**self.request.retries + # else: + # max_retries = 10 + # countdown = 15 * 2**self.request.retries + # return self._attempt_retry( + # max_retries=max_retries, + # countdown=countdown, + # current_yaml=current_yaml, + # commit=commit, + # **kwargs, + # ) + + # report_service = ReportService( + # current_yaml, gh_app_installation_name=installation_name_to_use + # ) + # head_report = report_service.get_existing_report_for_commit( + # commit, report_class=ReadOnlyReport + # ) + # if self.should_send_notifications( + # current_yaml, commit, ci_results, head_report + # ): + # enriched_pull = async_to_sync( + # fetch_and_update_pull_request_information_from_commit + # )(repository_service, commit, current_yaml) + # if enriched_pull and enriched_pull.database_pull: + # pull = enriched_pull.database_pull + # base_commit = self.fetch_pull_request_base(pull) + # else: + # pull = None + # base_commit = self.fetch_parent(commit) + + # if ( + # enriched_pull + # and not self.send_notifications_if_commit_differs_from_pulls_head( + # commit, enriched_pull, current_yaml + # ) + # and empty_upload is None + # ): + # log.info( + # "Not sending notifications for commit when it differs from pull's most recent head", + # extra=dict( + # commit=commit.commitid, + # repoid=commit.repoid, + # current_yaml=current_yaml.to_dict(), + # pull_head=enriched_pull.provider_pull["head"]["commitid"], + # ), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_STALE_HEAD) + # return { + # "notified": False, + # "notifications": None, + # "reason": "User doesnt want notifications warning them that current head differs from pull request most recent head.", + # } + + # if base_commit is not None: + # base_report = report_service.get_existing_report_for_commit( + # base_commit, report_class=ReadOnlyReport + # ) + # else: + # base_report = None + # if head_report is None and empty_upload is None: + # self.log_checkpoint(UploadFlow.NOTIF_ERROR_NO_REPORT) + # return { + # "notified": False, + # "notifications": None, + # "reason": "no_head_report", + # } + + # if commit.repository.service == "gitlab": + # gitlab_extra_shas_to_notify = self.get_gitlab_extra_shas_to_notify( + # commit, repository_service + # ) + # else: + # gitlab_extra_shas_to_notify = None + + # log.info( + # "We are going to be sending notifications", + # extra=dict( + # commit=commit.commitid, + # repoid=commit.repoid, + # current_yaml=current_yaml.to_dict(), + # ), + # ) + # notifications = self.submit_third_party_notifications( + # current_yaml, + # base_commit, + # commit, + # base_report, + # head_report, + # enriched_pull, + # repository_service, + # empty_upload, + # all_tests_passed=( + # test_result_commit_report is not None + # and test_result_commit_report.test_result_totals is not None + # and test_result_commit_report.test_result_totals.error is None + # and test_result_commit_report.test_result_totals.failed == 0 + # ), + # test_results_error=( + # test_result_commit_report is not None + # and test_result_commit_report.test_result_totals is not None + # and test_result_commit_report.test_result_totals.error + # ), + # installation_name_to_use=installation_name_to_use, + # gh_is_using_codecov_commenter=self.is_using_codecov_commenter( + # repository_service + # ), + # gitlab_extra_shas_to_notify=gitlab_extra_shas_to_notify, + # ) + # self.log_checkpoint(UploadFlow.NOTIFIED) + # log.info( + # "Notifications done", + # extra=dict( + # notifications=notifications, + # notification_count=len(notifications), + # commit=commit.commitid, + # repoid=commit.repoid, + # pullid=pull.pullid if pull is not None else None, + # ), + # ) + # db_session.commit() + # return {"notified": True, "notifications": notifications} + # else: + # log.info( + # "Not sending notifications at all", + # extra=dict(commit=commit.commitid, repoid=commit.repoid), + # ) + # self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) + # return {"notified": False, "notifications": None} + + + + + # def log_checkpoint(self, checkpoint): + # """ + # Only log a checkpoint if whoever scheduled us sent checkpoints data from + # the same flow. + + # The notify task is an important part of `UploadFlow`, but it's also used + # elsewhere. If this instance of the notify task wasn't scheduled as part + # of upload processing, attempting to log `UploadFlow` checkpoints for it + # will pollute our metrics. + # """ + # if UploadFlow.has_begun(): + # UploadFlow.log(checkpoint) + + # Notify task signature + # notify_kwargs = { + # "repoid": repoid, + # "commitid": commitid, + # "current_yaml": commit_yaml.to_dict(), + # } + # notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) + # task = self.app.tasks[notify_task_name].apply_async( + # kwargs=notify_kwargs + # ) + + + def _attempt_retry( + self, + max_retries: int, + countdown: int, + commit: Commit, + current_yaml: Optional[UserYaml], + *args, + **kwargs, + ) -> None: + try: + self.retry(max_retries=max_retries, countdown=countdown) + except MaxRetriesExceededError: + log.warning( + "Not attempting to retry notifications since we already retried too many times", + extra=dict( + repoid=commit.repoid, + commit=commit.commitid, + max_retries=max_retries, + next_countdown_would_be=countdown, + current_yaml=current_yaml.to_dict(), + ), + ) + self.log_checkpoint(UploadFlow.NOTIF_TOO_MANY_RETRIES) + return { + "notified": False, + "notifications": None, + "reason": "too_many_retries", + } + +RegisteredNotificationOrchestratorTask = celery_app.register_task(NotificationOrchestratorTask()) +notification_orchestrator_task = celery_app.tasks[RegisteredNotificationOrchestratorTask.name] \ No newline at end of file diff --git a/tasks/notify.py b/tasks/notify.py index 927cdec5f..55da99eb4 100644 --- a/tasks/notify.py +++ b/tasks/notify.py @@ -18,22 +18,19 @@ from shared.django_apps.codecov_auth.models import Service from shared.reports.readonly import ReadOnlyReport from shared.torngit.base import TokenType, TorngitBaseAdapter -from shared.torngit.exceptions import TorngitClientError, TorngitServerFailureError from shared.typings.torngit import OwnerInfo, RepoInfo, TorngitInstanceData from shared.yaml import UserYaml from sqlalchemy import and_ from sqlalchemy.orm.session import Session from app import celery_app -from database.enums import CommitErrorTypes, Decoration, NotificationState, ReportType +from database.enums import Decoration, NotificationState, ReportType from database.models import Commit, Pull from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME, CompareCommit from helpers.checkpoint_logger.flows import UploadFlow -from helpers.clock import get_seconds_to_next_hour from helpers.comparison import minimal_totals -from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError +from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task -from helpers.save_commit_error import save_commit_error from services.activation import activate_user from services.commit_status import RepositoryCIFilter from services.comparison import ( @@ -201,6 +198,8 @@ def run_impl_within_lock( "reason": "test_failures", } + # ASK: this installation/bot related logic impedes notification if unavailable, but it + # seems correct for it to be part of the notifier task, thoughts? try: installation_name_to_use = get_installation_name_for_owner_for_task( self.name, commit.repository.owner @@ -208,124 +207,101 @@ def run_impl_within_lock( repository_service = get_repo_provider_service_for_specific_commit( commit, installation_name_to_use ) + # The NoConfiguredAppsAvailable exception was moved to the notification orchestrator task + # as once it gets to the notifications it should be cleared of that exception. In theory + # the RepositoryWithoutValidBotError exception could also be moved except RepositoryWithoutValidBotError: - save_commit_error( - commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value - ) - log.warning( "Unable to start notifications because repo doesn't have a valid bot", extra=dict(repoid=repoid, commit=commitid), ) self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) return {"notified": False, "notifications": None, "reason": "no_valid_bot"} - except NoConfiguredAppsAvailable as exp: - if exp.rate_limited_count > 0: - # There's at least 1 app that we can use to communicate with GitHub, - # but this app happens to be rate limited now. We try again later. - # Min wait time of 1 minute - retry_delay_seconds = max(60, get_seconds_to_next_hour()) - log.warning( - "Unable to start notifications. Retrying again later.", - extra=dict( - repoid=repoid, - commit=commitid, - apps_available=exp.apps_count, - apps_rate_limited=exp.rate_limited_count, - apps_suspended=exp.suspended_count, - countdown_seconds=retry_delay_seconds, - ), - ) - return self._attempt_retry( - max_retries=10, - countdown=retry_delay_seconds, - current_yaml=current_yaml, - commit=commit, - **kwargs, - ) - # Maybe we have apps that are suspended. We can't communicate with github. - log.warning( - "We can't find an app to communicate with GitHub. Not notifying.", - extra=dict( - repoid=repoid, - commit=commitid, - apps_available=exp.apps_count, - apps_suspended=exp.suspended_count, - ), - ) - self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) - return { - "notified": False, - "notifications": None, - "reason": "no_valid_github_app_found", - } + + # except NoConfiguredAppsAvailable as exp: + # # Maybe we have apps that are suspended. We can't communicate with github. + # log.warning( + # "We can't find an app to communicate with GitHub. Not notifying.", + # extra=dict( + # repoid=repoid, + # commit=commitid, + # apps_available=exp.apps_count, + # apps_suspended=exp.suspended_count, + # ), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) + # return { + # "notified": False, + # "notifications": None, + # "reason": "no_valid_github_app_found", + # } if current_yaml is None: current_yaml = async_to_sync(get_current_yaml)(commit, repository_service) else: current_yaml = UserYaml.from_dict(current_yaml) - try: - ci_results = self.fetch_and_update_whether_ci_passed( - repository_service, commit, current_yaml - ) - except TorngitClientError as ex: - log.info( - "Unable to fetch CI results due to a client problem. Not notifying user", - extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), - ) - self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) - return { - "notified": False, - "notifications": None, - "reason": "not_able_fetch_ci_result", - } - except TorngitServerFailureError: - log.info( - "Unable to fetch CI results due to server issues. Not notifying user", - extra=dict(repoid=commit.repoid, commit=commit.commitid), - ) - self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) - return { - "notified": False, - "notifications": None, - "reason": "server_issues_ci_result", - } - if self.should_wait_longer(current_yaml, commit, ci_results): - log.info( - "Not sending notifications yet because we are waiting for CI to finish", - extra=dict(repoid=commit.repoid, commit=commit.commitid), - ) - ghapp_default_installations = list( - filter( - lambda obj: obj.name == installation_name_to_use - and obj.is_configured(), - commit.repository.owner.github_app_installations or [], - ) - ) - rely_on_webhook_ghapp = ghapp_default_installations != [] and any( - obj.is_repo_covered_by_integration(commit.repository) - for obj in ghapp_default_installations - ) - rely_on_webhook_legacy = commit.repository.using_integration - if ( - rely_on_webhook_ghapp - or rely_on_webhook_legacy - or commit.repository.hookid - ): - # rely on the webhook, but still retry in case we miss the webhook - max_retries = 5 - countdown = (60 * 3) * 2**self.request.retries - else: - max_retries = 10 - countdown = 15 * 2**self.request.retries - return self._attempt_retry( - max_retries=max_retries, - countdown=countdown, - current_yaml=current_yaml, - commit=commit, - **kwargs, - ) + # try: + # ci_results = self.fetch_and_update_whether_ci_passed( + # repository_service, commit, current_yaml + # ) + # except TorngitClientError as ex: + # log.info( + # "Unable to fetch CI results due to a client problem. Not notifying user", + # extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) + # return { + # "notified": False, + # "notifications": None, + # "reason": "not_able_fetch_ci_result", + # } + # except TorngitServerFailureError: + # log.info( + # "Unable to fetch CI results due to server issues. Not notifying user", + # extra=dict(repoid=commit.repoid, commit=commit.commitid), + # ) + # self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) + # return { + # "notified": False, + # "notifications": None, + # "reason": "server_issues_ci_result", + # } + # if self.should_wait_longer(current_yaml, commit, ci_results): + # log.info( + # "Not sending notifications yet because we are waiting for CI to finish", + # extra=dict(repoid=commit.repoid, commit=commit.commitid), + # ) + # ghapp_default_installations = list( + # filter( + # lambda obj: obj.name == installation_name_to_use + # and obj.is_configured(), + # commit.repository.owner.github_app_installations or [], + # ) + # ) + # rely_on_webhook_ghapp = ghapp_default_installations != [] and any( + # obj.is_repo_covered_by_integration(commit.repository) + # for obj in ghapp_default_installations + # ) + # rely_on_webhook_legacy = commit.repository.using_integration + # if ( + # rely_on_webhook_ghapp + # or rely_on_webhook_legacy + # or commit.repository.hookid + # ): + # # rely on the webhook, but still retry in case we miss the webhook + # max_retries = 5 + # countdown = (60 * 3) * 2**self.request.retries + # else: + # max_retries = 10 + # countdown = 15 * 2**self.request.retries + # return self._attempt_retry( + # max_retries=max_retries, + # countdown=countdown, + # current_yaml=current_yaml, + # commit=commit, + # **kwargs, + # ) report_service = ReportService( current_yaml, gh_app_installation_name=installation_name_to_use @@ -333,117 +309,118 @@ def run_impl_within_lock( head_report = report_service.get_existing_report_for_commit( commit, report_class=ReadOnlyReport ) - if self.should_send_notifications( - current_yaml, commit, ci_results, head_report - ): - enriched_pull = async_to_sync( - fetch_and_update_pull_request_information_from_commit - )(repository_service, commit, current_yaml) - if enriched_pull and enriched_pull.database_pull: - pull = enriched_pull.database_pull - base_commit = self.fetch_pull_request_base(pull) - else: - pull = None - base_commit = self.fetch_parent(commit) - - if ( - enriched_pull - and not self.send_notifications_if_commit_differs_from_pulls_head( - commit, enriched_pull, current_yaml - ) - and empty_upload is None - ): - log.info( - "Not sending notifications for commit when it differs from pull's most recent head", - extra=dict( - commit=commit.commitid, - repoid=commit.repoid, - current_yaml=current_yaml.to_dict(), - pull_head=enriched_pull.provider_pull["head"]["commitid"], - ), - ) - self.log_checkpoint(UploadFlow.NOTIF_STALE_HEAD) - return { - "notified": False, - "notifications": None, - "reason": "User doesnt want notifications warning them that current head differs from pull request most recent head.", - } - - if base_commit is not None: - base_report = report_service.get_existing_report_for_commit( - base_commit, report_class=ReadOnlyReport - ) - else: - base_report = None - if head_report is None and empty_upload is None: - self.log_checkpoint(UploadFlow.NOTIF_ERROR_NO_REPORT) - return { - "notified": False, - "notifications": None, - "reason": "no_head_report", - } - - if commit.repository.service == "gitlab": - gitlab_extra_shas_to_notify = self.get_gitlab_extra_shas_to_notify( - commit, repository_service - ) - else: - gitlab_extra_shas_to_notify = None + enriched_pull = async_to_sync( + fetch_and_update_pull_request_information_from_commit + )(repository_service, commit, current_yaml) + + if enriched_pull and enriched_pull.database_pull: + pull = enriched_pull.database_pull + base_commit = self.fetch_pull_request_base(pull) + else: + pull = None + base_commit = self.fetch_parent(commit) + if ( + enriched_pull + # TODO: I believe this should be something we extract to the notification orchestrator + # but it seems to intertwined with the logic here. The enriched_pull call would need to be + # cached before moving that to the orchestrator to avoid redundant API calls + and not self.send_notifications_if_commit_differs_from_pulls_head( + commit, enriched_pull, current_yaml + ) + and empty_upload is None + ): log.info( - "We are going to be sending notifications", + "Not sending notifications for commit when it differs from pull's most recent head", extra=dict( commit=commit.commitid, repoid=commit.repoid, current_yaml=current_yaml.to_dict(), + pull_head=enriched_pull.provider_pull["head"]["commitid"], ), ) - notifications = self.submit_third_party_notifications( - current_yaml, - base_commit, - commit, - base_report, - head_report, - enriched_pull, - repository_service, - empty_upload, - all_tests_passed=( - test_result_commit_report is not None - and test_result_commit_report.test_result_totals is not None - and test_result_commit_report.test_result_totals.error is None - and test_result_commit_report.test_result_totals.failed == 0 - ), - test_results_error=( - test_result_commit_report is not None - and test_result_commit_report.test_result_totals is not None - and test_result_commit_report.test_result_totals.error - ), - installation_name_to_use=installation_name_to_use, - gh_is_using_codecov_commenter=self.is_using_codecov_commenter( - repository_service - ), - gitlab_extra_shas_to_notify=gitlab_extra_shas_to_notify, - ) - self.log_checkpoint(UploadFlow.NOTIFIED) - log.info( - "Notifications done", - extra=dict( - notifications=notifications, - notification_count=len(notifications), - commit=commit.commitid, - repoid=commit.repoid, - pullid=pull.pullid if pull is not None else None, - ), + self.log_checkpoint(UploadFlow.NOTIF_STALE_HEAD) + return { + "notified": False, + "notifications": None, + "reason": "User doesnt want notifications warning them that current head differs from pull request most recent head.", + } + + if base_commit is not None: + base_report = report_service.get_existing_report_for_commit( + base_commit, report_class=ReadOnlyReport ) - db_session.commit() - return {"notified": True, "notifications": notifications} else: - log.info( - "Not sending notifications at all", - extra=dict(commit=commit.commitid, repoid=commit.repoid), + base_report = None + if head_report is None and empty_upload is None: + self.log_checkpoint(UploadFlow.NOTIF_ERROR_NO_REPORT) + return { + "notified": False, + "notifications": None, + "reason": "no_head_report", + } + + if commit.repository.service == "gitlab": + gitlab_extra_shas_to_notify = self.get_gitlab_extra_shas_to_notify( + commit, repository_service ) - self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) - return {"notified": False, "notifications": None} + else: + gitlab_extra_shas_to_notify = None + + log.info( + "We are going to be sending notifications", + extra=dict( + commit=commit.commitid, + repoid=commit.repoid, + current_yaml=current_yaml.to_dict(), + ), + ) + notifications = self.submit_third_party_notifications( + current_yaml, + base_commit, + commit, + base_report, + head_report, + enriched_pull, + repository_service, + empty_upload, + all_tests_passed=( + test_result_commit_report is not None + and test_result_commit_report.test_result_totals is not None + and test_result_commit_report.test_result_totals.error is None + and test_result_commit_report.test_result_totals.failed == 0 + ), + test_results_error=( + test_result_commit_report is not None + and test_result_commit_report.test_result_totals is not None + and test_result_commit_report.test_result_totals.error + ), + installation_name_to_use=installation_name_to_use, + gh_is_using_codecov_commenter=self.is_using_codecov_commenter( + repository_service + ), + gitlab_extra_shas_to_notify=gitlab_extra_shas_to_notify, + ) + self.log_checkpoint(UploadFlow.NOTIFIED) + log.info( + "Notifications done", + extra=dict( + notifications=notifications, + notification_count=len(notifications), + commit=commit.commitid, + repoid=commit.repoid, + pullid=pull.pullid if pull is not None else None, + ), + ) + db_session.commit() + return {"notified": True, "notifications": notifications} + # else: + # log.info( + # "Not sending notifications at all", + # extra=dict(commit=commit.commitid, repoid=commit.repoid), + # ) + # self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) + # return {"notified": False, "notifications": None} def is_using_codecov_commenter( self, repository_service: TorngitBaseAdapter diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 91968617b..35d12312f 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -1,17 +1,13 @@ import logging import random -import re from datetime import datetime, timedelta, timezone -from enum import Enum import sentry_sdk from asgiref.sync import async_to_sync from redis.exceptions import LockError from redis.lock import Lock from shared.celery_config import ( - compute_comparison_task_name, - notify_task_name, - pulls_task_name, + notification_orchestrator_task_name, timeseries_save_commit_measurements_task_name, upload_finisher_task_name, ) @@ -20,39 +16,29 @@ from shared.yaml import UserYaml from app import celery_app -from celery_config import notify_error_task_name from database.enums import CommitErrorTypes -from database.models import Commit, Pull +from database.models import Commit from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME from helpers.cache import cache from helpers.checkpoint_logger.flows import UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task from helpers.save_commit_error import save_commit_error -from services.comparison import get_or_create_comparison from services.processing.intermediate import ( cleanup_intermediate_reports, load_intermediate_reports, ) from services.processing.merging import merge_reports, update_uploads -from services.processing.state import ProcessingState, should_trigger_postprocessing +from services.processing.state import ProcessingState from services.processing.types import ProcessingResult from services.redis import get_redis_connection from services.report import ReportService from services.repository import get_repo_provider_service -from services.yaml import read_yaml_field from tasks.base import BaseCodecovTask from tasks.upload_processor import MAX_RETRIES, UPLOAD_PROCESSING_LOCK_NAME log = logging.getLogger(__name__) -regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") - - -class ShouldCallNotifyResult(Enum): - DO_NOT_NOTIFY = "do_not_notify" - NOTIFY_ERROR = "notify_error" - NOTIFY = "notify" class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): @@ -64,8 +50,8 @@ class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): This task does the finishing steps after a group of uploads is processed The steps are: - - Schedule the set_pending task, depending on the case - - Schedule notification tasks, depending on the case + - Merging reports into one report for GCP and update Upload records with status + - Schedule notification orchestrator task - Invalidating whatever cache is done """ @@ -124,7 +110,6 @@ def run_impl( db_session.commit() state.mark_uploads_as_merged(upload_ids) - except LockError: max_retry = 200 * 3**self.request.retries retry_in = min(random.randint(max_retry // 2, max_retry), 60 * 60 * 5) @@ -136,21 +121,22 @@ def run_impl( cleanup_intermediate_reports(upload_ids) - if not should_trigger_postprocessing(state.get_upload_numbers()): - UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) - UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) - return - + # Call the notification orchestrator task lock_name = f"upload_finisher_lock_{repoid}_{commitid}" redis_connection = get_redis_connection() try: with redis_connection.lock(lock_name, timeout=60 * 5, blocking_timeout=5): - result = self.finish_reports_processing( - db_session, - commit, - commit_yaml, - processing_results, - report_code, + notification_orchestrator_kwargs = { + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "processing_results": processing_results, + "report_code": report_code + } + notification_orchestrator_kwargs = UploadFlow.save_to_kwargs(notification_orchestrator_kwargs) + # TODO: add log to add the notification orchestrator task + self.app.tasks[notification_orchestrator_task_name].apply_async( + kwargs=notification_orchestrator_kwargs ) self.app.tasks[ timeseries_save_commit_measurements_task_name @@ -166,182 +152,14 @@ def run_impl( db_session.commit() self.invalidate_caches(redis_connection, commit) + log.info("Finished upload_finisher task") - return result + UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) + return {"result": "finisher done"} except LockError: log.warning("Unable to acquire lock", extra=dict(lock_name=lock_name)) UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) - def finish_reports_processing( - self, - db_session, - commit: Commit, - commit_yaml: UserYaml, - processing_results: list[ProcessingResult], - report_code, - ): - log.debug("In finish_reports_processing for commit: %s" % commit) - commitid = commit.commitid - repoid = commit.repoid - - # always notify, let the notify handle if it should submit - notifications_called = False - if not regexp_ci_skip.search(commit.message or ""): - match self.should_call_notifications( - commit, commit_yaml, processing_results, report_code - ): - case ShouldCallNotifyResult.NOTIFY: - notifications_called = True - notify_kwargs = { - "repoid": repoid, - "commitid": commitid, - "current_yaml": commit_yaml.to_dict(), - } - notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) - task = self.app.tasks[notify_task_name].apply_async( - kwargs=notify_kwargs - ) - log.info( - "Scheduling notify task", - extra=dict( - repoid=repoid, - commit=commitid, - commit_yaml=commit_yaml.to_dict(), - processing_results=processing_results, - notify_task_id=task.id, - parent_task=self.request.parent_id, - ), - ) - if commit.pullid: - pull = ( - db_session.query(Pull) - .filter_by(repoid=commit.repoid, pullid=commit.pullid) - .first() - ) - if pull: - head = pull.get_head_commit() - if head is None or head.timestamp <= commit.timestamp: - pull.head = commit.commitid - if pull.head == commit.commitid: - db_session.commit() - self.app.tasks[pulls_task_name].apply_async( - kwargs=dict( - repoid=repoid, - pullid=pull.pullid, - should_send_notifications=False, - ) - ) - compared_to = pull.get_comparedto_commit() - if compared_to: - comparison = get_or_create_comparison( - db_session, compared_to, commit - ) - db_session.commit() - self.app.tasks[ - compute_comparison_task_name - ].apply_async( - kwargs=dict(comparison_id=comparison.id) - ) - case ShouldCallNotifyResult.DO_NOT_NOTIFY: - notifications_called = False - log.info( - "Skipping notify task", - extra=dict( - repoid=repoid, - commit=commitid, - commit_yaml=commit_yaml.to_dict(), - processing_results=processing_results, - parent_task=self.request.parent_id, - ), - ) - case ShouldCallNotifyResult.NOTIFY_ERROR: - notifications_called = False - notify_error_kwargs = { - "repoid": repoid, - "commitid": commitid, - "current_yaml": commit_yaml.to_dict(), - } - notify_error_kwargs = UploadFlow.save_to_kwargs(notify_error_kwargs) - task = self.app.tasks[notify_error_task_name].apply_async( - kwargs=notify_error_kwargs - ) - else: - commit.state = "skipped" - - UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) - if not notifications_called: - UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) - - return {"notifications_called": notifications_called} - - def should_call_notifications( - self, - commit: Commit, - commit_yaml: UserYaml, - processing_results: list[ProcessingResult], - report_code, - ) -> ShouldCallNotifyResult: - extra_dict = { - "repoid": commit.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml, - "processing_results": processing_results, - "report_code": report_code, - "parent_task": self.request.parent_id, - } - - manual_trigger = read_yaml_field( - commit_yaml, ("codecov", "notify", "manual_trigger") - ) - if manual_trigger: - log.info( - "Not scheduling notify because manual trigger is used", - extra=extra_dict, - ) - return ShouldCallNotifyResult.DO_NOT_NOTIFY - # Notifications should be off in case of local uploads, and report code wouldn't be null in that case - if report_code is not None: - log.info( - "Not scheduling notify because it's a local upload", - extra=extra_dict, - ) - return ShouldCallNotifyResult.DO_NOT_NOTIFY - - after_n_builds = ( - read_yaml_field(commit_yaml, ("codecov", "notify", "after_n_builds")) or 0 - ) - if after_n_builds > 0: - report = ReportService(commit_yaml).get_existing_report_for_commit(commit) - number_sessions = len(report.sessions) if report is not None else 0 - if after_n_builds > number_sessions: - log.info( - "Not scheduling notify because `after_n_builds` is %s and we only found %s builds", - after_n_builds, - number_sessions, - extra=extra_dict, - ) - return ShouldCallNotifyResult.DO_NOT_NOTIFY - - processing_successses = [x["successful"] for x in processing_results] - - if read_yaml_field( - commit_yaml, - ("codecov", "notify", "notify_error"), - _else=False, - ): - if len(processing_successses) == 0 or not all(processing_successses): - log.info( - "Not scheduling notify because there is a non-successful processing result", - extra=extra_dict, - ) - - return ShouldCallNotifyResult.NOTIFY_ERROR - else: - if not any(processing_successses): - return ShouldCallNotifyResult.DO_NOT_NOTIFY - - return ShouldCallNotifyResult.NOTIFY - def invalidate_caches(self, redis_connection, commit: Commit): redis_connection.delete("cache/{}/tree/{}".format(commit.repoid, commit.branch)) redis_connection.delete( @@ -354,11 +172,6 @@ def invalidate_caches(self, redis_connection, commit: Commit): if commit.branch == repository.branch: redis_connection.hdel("badge", ("%s:" % key).lower()) - -RegisteredUploadTask = celery_app.register_task(UploadFinisherTask()) -upload_finisher_task = celery_app.tasks[RegisteredUploadTask.name] - - def get_report_lock(repoid: int, commitid: str, hard_time_limit: int) -> Lock: lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) redis_connection = get_redis_connection() @@ -440,3 +253,6 @@ def load_commit_diff(commit: Commit, task_name: str | None = None) -> dict | Non ) return None + +RegisteredUploadTask = celery_app.register_task(UploadFinisherTask()) +upload_finisher_task = celery_app.tasks[RegisteredUploadTask.name] \ No newline at end of file From bd0752ce7e8cde46eee597687dabb159996e3c25 Mon Sep 17 00:00:00 2001 From: Adrian Date: Fri, 3 Jan 2025 13:30:45 -0600 Subject: [PATCH 2/3] 1st pass at notification orchestrator logic --- tasks/__init__.py | 2 +- tasks/notification_orchestrator.py | 1291 ++++++++-------------------- tasks/notify.py | 127 +-- tasks/upload_finisher.py | 12 +- 4 files changed, 348 insertions(+), 1084 deletions(-) diff --git a/tasks/__init__.py b/tasks/__init__.py index b9c0761b1..a3f39ca36 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -31,8 +31,8 @@ from tasks.label_analysis import label_analysis_task from tasks.manual_trigger import manual_trigger_task from tasks.new_user_activated import new_user_activated_task -from tasks.notify import notify_task from tasks.notification_orchestrator import notification_orchestrator_task +from tasks.notify import notify_task from tasks.notify_error import notify_error_task from tasks.plan_manager_task import daily_plan_manager_task_name from tasks.preprocess_upload import preprocess_upload_task diff --git a/tasks/notification_orchestrator.py b/tasks/notification_orchestrator.py index 8aba15d2a..24cab294c 100644 --- a/tasks/notification_orchestrator.py +++ b/tasks/notification_orchestrator.py @@ -1,116 +1,68 @@ import logging -from typing import Optional - -import sentry_sdk import re -from asgiref.sync import async_to_sync -from celery.exceptions import MaxRetriesExceededError, SoftTimeLimitExceeded -from celery_config import notify_error_task_name -from shared.bots.github_apps import ( - get_github_app_token, - get_specific_github_app_details, -) +from enum import Enum +from functools import partial +from typing import Optional, TypedDict + +from celery.exceptions import MaxRetriesExceededError from shared.celery_config import ( - activate_account_user_task_name, - new_user_activated_task_name, - notification_orchestrator_task_name, - status_set_error_task_name, compute_comparison_task_name, + notification_orchestrator_task_name, notify_task_name, pulls_task_name, + status_set_error_task_name, ) -from shared.config import get_config -from shared.django_apps.codecov_auth.models import Service -from shared.reports.readonly import ReadOnlyReport -from shared.torngit.base import TokenType, TorngitBaseAdapter from shared.torngit.exceptions import TorngitClientError, TorngitServerFailureError -from shared.typings.torngit import OwnerInfo, RepoInfo, TorngitInstanceData from shared.yaml import UserYaml -from sqlalchemy import and_ from sqlalchemy.orm.session import Session -from enum import Enum from app import celery_app -from database.enums import CommitErrorTypes, Decoration, NotificationState, ReportType +from celery_config import notify_error_task_name +from database.enums import CommitErrorTypes, ReportType from database.models import Commit, Pull -from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME, CompareCommit from helpers.checkpoint_logger.flows import UploadFlow from helpers.clock import get_seconds_to_next_hour -from helpers.comparison import minimal_totals from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task from helpers.save_commit_error import save_commit_error -from services.activation import activate_user -from services.commit_status import RepositoryCIFilter from services.comparison import ( - ComparisonContext, - ComparisonProxy, get_or_create_comparison, ) -from services.comparison.types import Comparison, FullCommit -from services.decoration import determine_decoration_details -from services.github import get_github_app_for_commit, set_github_app_for_commit from services.lock_manager import LockManager, LockRetry, LockType -from services.notification import NotificationService +from services.processing.state import ProcessingState, should_trigger_postprocessing +from services.processing.types import ProcessingResult from services.redis import Redis, get_redis_connection from services.report import ReportService -from services.processing.types import ProcessingResult -from services.processing.state import ProcessingState, should_trigger_postprocessing from services.repository import ( - EnrichedPull, - _get_repo_provider_service_instance, - fetch_and_update_pull_request_information_from_commit, get_repo_provider_service, ) -from services.yaml import get_current_yaml, read_yaml_field +from services.yaml import read_yaml_field from tasks.base import BaseCodecovTask from tasks.upload_processor import UPLOAD_PROCESSING_LOCK_NAME log = logging.getLogger(__name__) +regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") -# This should decide if it should notify based on -# Retry if someone tries to access lock and it's busy -# Processing state from other uploads with the same repo/commit id combo -# Business logic -# Notify side-effects - # Call timeseries - # Call pull sync task - -# This needs to -# Have the same signature as the notification's task to keep notifications the same - -def has_upcoming_notifies_according_to_redis( - self, redis_connection: Redis, repoid: int, commitid: str -) -> bool: - """Checks whether there are any jobs processing according to Redis right now and, - therefore, whether more up-to-date notifications will come after this anyway - - It's very important to have this code be conservative against saying - there are upcoming notifies already. The point of this code is to - avoid extra notifications for efficiency purposes, but it is better - to send extra notifications than to lack notifications - Args: - redis_connection (Redis): The redis connection we check against - repoid (int): The repoid of the commit - commitid (str): The commitid of the commit - """ - upload_processing_lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) - if redis_connection.get(upload_processing_lock_name): - return True - return False +class ShouldCallNotifyResult(Enum): + NOTIFY = "notify" + NOTIFY_ERROR = "notify_error" + DO_NOT_NOTIFY = "do_not_notify" + WAIT_TO_NOTIFY = "wait_to_notify" -regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") +class ShouldCallNotifyResponse(TypedDict): + notification_result: ShouldCallNotifyResult + reason: str + # This is currently not used, but my idea here was to provide it and log this message in switch statements. + # Logging could become a bit less flexible but there's less logging altogether. Thoughts? + message: str -class ShouldCallNotifyResult(Enum): - WAIT_TO_NOTIFY = "wait_to_notify" - DO_NOT_NOTIFY = "do_not_notify" - NOTIFY_ERROR = "notify_error" - NOTIFY = "notify" # TODO: Ask: does this need a throws = (SoftTimeLimitExceeded,)? -class NotificationOrchestratorTask(BaseCodecovTask, name=notification_orchestrator_task_name): +class NotificationOrchestratorTask( + BaseCodecovTask, name=notification_orchestrator_task_name +): def run_impl( self, db_session: Session, @@ -123,26 +75,19 @@ def run_impl( empty_upload=None, **kwargs, ): - print("Helllllooooooo", repoid) - print("Helllllooooooo", commitid) - print("Helllllooooooo", processing_results) - print("Helllllooooooo", args) - print("Helllllooooooo", commit_yaml) - print("Helllllooooooo", empty_upload) - print("Helllllooooooo", kwargs) - lock_manager = LockManager( repoid=repoid, commitid=commitid, # Currently hardcoded to coverage, should be variable to the report type report_type=ReportType.COVERAGE, - # Unsure if we need a specific timeout hard timeout for this task + # TODO: Ask if we need a specific timeout hard timeout for this task, unsure lock_timeout=max(80, self.hard_time_limit_task), ) try: lock_acquired = False with lock_manager.locked( - lock_type=LockType.NOTIFICATION_ORCHESTRATOR, retry_num=self.request.retries + lock_type=LockType.NOTIFICATION_ORCHESTRATOR, + retry_num=self.request.retries, ): lock_acquired = True return self.run_impl_within_lock( @@ -169,32 +114,13 @@ def run_impl( ), ), ) - self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) + UploadFlow.log(UploadFlow.NOTIF_LOCK_ERROR) return { "notified": False, "notifications": None, "reason": "unobtainable_lock", } - - - - # redis_connection = get_redis_connection() - # if self.has_upcoming_notifies_according_to_redis( - # redis_connection, repoid, commitid - # ): - # log.info( - # "Not notifying because there are seemingly other jobs being processed yet", - # extra=dict(repoid=repoid, commitid=commitid), - # ) - # self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) - # return { - # "notified": False, - # "notifications": None, - # "reason": "has_other_notifies_coming", - # } - - def run_impl_within_lock( self, db_session: Session, @@ -207,71 +133,178 @@ def run_impl_within_lock( empty_upload=None, **kwargs, ): - # Should not notify based on yaml settings - # Should not notify based on type of upload (empty upload) - # Should not notify based on processing results - # Upload states - # Should not notify based on pending notifications - # Should not notify based on business logic - - # States - ### metrics log, log.info/warning. If success, side-effects - # Should not notify - # Should attempt to notify later - # Should notify - - ##### Start - redis_connection = get_redis_connection() repoid = int(repoid) - commit_yaml = UserYaml(commit_yaml) - commit = ( db_session.query(Commit) .filter(Commit.repoid == repoid, Commit.commitid == commitid) .first() ) assert commit, "Commit not found in database." - repository = commit.repository - - state = ProcessingState(repoid, commitid) - processing_successes = [x["successful"] for x in processing_results] - notifications_called = False + commit_yaml = UserYaml(commit_yaml) - extra_dict = { + log_extra_dict = { "repoid": commit.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml, + "commit": commit.commitid, + "commit_yaml": commit_yaml.to_dict(), "processing_results": processing_results, "report_code": report_code, "parent_task": self.request.parent_id, } - ##### Checks other uploads and does things based on them + # Main logic that controls the notification states and decides what to do based on them + should_call_notification = self.should_call_notifications( + commit=commit, + commit_yaml=commit_yaml, + processing_results=processing_results, + report_code=report_code, + log_extra_dict=log_extra_dict, + ) + match should_call_notification: + case {"notification_result": ShouldCallNotifyResult.NOTIFY}: + notify_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + "empty_upload": empty_upload, + } + notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) + task = self.app.tasks[notify_task_name].apply_async( + kwargs=notify_kwargs + ) + log.info( + "Scheduling notify task", + extra={ + **log_extra_dict, + "notify_task_id": task.id, + }, + ) + self.orchestrator_side_effects(db_session=db_session, commit=commit) + # TODO: We should add a UploadFlow.ATTEMPTING_NOTIFICATION + case { + "notification_result": ShouldCallNotifyResult.DO_NOT_NOTIFY, + "reason": reason, + }: + log.info( + "Skipping notify task", extra={**log_extra_dict, "reason": reason} + ) + UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) + case { + "notification_result": ShouldCallNotifyResult.WAIT_TO_NOTIFY, + "reason": reason, + "extra": {"max_retries": max_retries, "countdown": countdown}, + }: + log.info( + "Unable to start notifications. Retrying again later.", + extra={ + **log_extra_dict, + "reason": reason, + "countdown": countdown, + "max_retries": max_retries, + }, + ) + return self._attempt_retry( + max_retries=max_retries, + countdown=countdown, + log_extra_dict=log_extra_dict, + ) + case {"notification_result": ShouldCallNotifyResult.NOTIFY_ERROR}: + log.info("Attempting to notify error", extra=log_extra_dict) + notify_error_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + } + notify_error_kwargs = UploadFlow.save_to_kwargs(notify_error_kwargs) + task = self.app.tasks[notify_error_task_name].apply_async( + kwargs=notify_error_kwargs + ) + + return should_call_notification + + def should_call_notifications( + self, + commit: Commit, + commit_yaml: UserYaml, + processing_results: list[ProcessingResult], + log_extra_dict: dict, + report_code: str | None = None, + ) -> ShouldCallNotifyResponse: + # Defines all the logic to consider a notification + should_notify_check_functions = [ + partial( + self.upload_processing_checks, processing_results=processing_results + ), + partial( + self.yaml_checks, + processing_results=processing_results, + commit_yaml=commit_yaml, + ), + partial(self.business_logic_checks, report_code=report_code), + ] + + # Loop through all the notification checks. Each function should return None unless + # it shouldn't notify, error or it needs to retry + for func in should_notify_check_functions: + result = func(commit=commit, log_extra_dict=log_extra_dict) + if result: + return result + + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.NOTIFY, + reason="successful_notification_scheduling", + message="Scheduling notify task", + ) + + def upload_processing_checks( + self, + commit: Commit, + processing_results: list[ProcessingResult], + log_extra_dict: dict, + ) -> Optional[ShouldCallNotifyResponse]: # Checks if there is at least a success + processing_successes = [x["successful"] for x in processing_results] if not any(processing_successes): - return ShouldCallNotifyResult.DO_NOT_NOTIFY + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="no_successful_processing", + message="No successful processing", + ) # Determine if there should be anything post_processing + repoid = commit.repoid + commitid = commit.commitid + state = ProcessingState(repoid, commitid) if not should_trigger_postprocessing(state.get_upload_numbers()): - UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) - return + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="no_postprocessing_needed", + message="No need to trigger postprocessing", + ) # Determines if there are other uploads processing or incoming - if has_upcoming_notifies_according_to_redis( + redis_connection = get_redis_connection() + if self.has_upcoming_notifies_according_to_redis( redis_connection, repoid, commitid ): log.info( "Not notifying because there are seemingly other jobs being processed yet", - extra=dict(repoid=repoid, commitid=commitid), + extra=log_extra_dict, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_other_notifications_coming", + message="Not notifying because there are seemingly other jobs being processed yet", ) - self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) - return { - "notified": False, - "notifications": None, - "reason": "has_other_notifies_coming", - } - ##### Yaml settings checks + return None + + def yaml_checks( + self, + commit: Commit, + processing_results: list[ProcessingResult], + commit_yaml: UserYaml, + log_extra_dict: dict, + ) -> Optional[ShouldCallNotifyResponse]: # Checks for manual trigger manual_trigger = read_yaml_field( commit_yaml, ("codecov", "notify", "manual_trigger") @@ -279,9 +312,13 @@ def run_impl_within_lock( if manual_trigger: log.info( "Not scheduling notify because manual trigger is used", - extra=extra_dict, + extra=log_extra_dict, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_manual_trigger_yaml_setting", + message="Not scheduling notify because manual trigger is used", ) - return ShouldCallNotifyResult.DO_NOT_NOTIFY # Checks for after_n_builds after_n_builds = ( @@ -292,12 +329,14 @@ def run_impl_within_lock( number_sessions = len(report.sessions) if report is not None else 0 if after_n_builds > number_sessions: log.info( - "Not scheduling notify because `after_n_builds` is %s and we only found %s builds", - after_n_builds, - number_sessions, - extra=extra_dict, + f"Not scheduling notify because `after_n_builds` is {after_n_builds} and we only found {number_sessions} builds", + extra=log_extra_dict, ) - return ShouldCallNotifyResult.DO_NOT_NOTIFY + return { + "notification_result": ShouldCallNotifyResult.DO_NOT_NOTIFY, + "reason": "has_after_n_builds_yaml_setting", + "message": f"Not scheduling notify because `after_n_builds` is {after_n_builds} and we only found {number_sessions} builds", + } # Checks for notify_error notify_error = read_yaml_field( @@ -305,36 +344,44 @@ def run_impl_within_lock( ("codecov", "notify", "notify_error"), _else=False, ) - if notify_error and (len(processing_successes) == 0 or not all(processing_successes)): + processing_successes = [x["successful"] for x in processing_results] + if notify_error and ( + len(processing_successes) == 0 or not all(processing_successes) + ): log.info( "Not scheduling notify because there is a non-successful processing result", - extra=extra_dict, + extra=log_extra_dict, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.NOTIFY_ERROR, + reason="has_notify_error_yaml_setting", + message="Not scheduling notify because there is a non-successful processing result", ) - return ShouldCallNotifyResult.NOTIFY_ERROR - ### Checks related to repository_service and ci_results - # ASK: this installation/bot related logic impedes notification if unavailable, but it - # seems correct for it to be part of the notifier task, thoughts? + # Creates repository_service and ci_results to use with yaml settings try: installation_name_to_use = get_installation_name_for_owner_for_task( self.name, commit.repository.owner ) - # This is technically a different repository_service method than the one used in notifications. I + # This is technically a different repository_service method than the one used in the notify task. I # replaced it because the other method is a) deeply intertwined with the notify file and b) it seems # to be extra specific just for the notification aspect of it, so this should suffice for other checks. # Please look corroborate this isn't a red flag. - repository_service = get_repo_provider_service(repository) + repository_service = get_repo_provider_service(commit.repository) except RepositoryWithoutValidBotError: save_commit_error( commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value ) - log.warning( "Unable to start notifications because repo doesn't have a valid bot", - extra=dict(repoid=repoid, commit=commitid), + extra=log_extra_dict, + ) + UploadFlow.log(UploadFlow.NOTIF_NO_VALID_INTEGRATION) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_no_valid_bot", + message="Unable to start notifications because repo doesn't have a valid bot", ) - self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) - return {"notified": False, "notifications": None, "reason": "no_valid_bot"} except NoConfiguredAppsAvailable as exp: if exp.rate_limited_count > 0: # There's at least 1 app that we can use to communicate with GitHub, @@ -343,38 +390,38 @@ def run_impl_within_lock( retry_delay_seconds = max(60, get_seconds_to_next_hour()) log.warning( "Unable to start notifications. Retrying again later.", + extra={ + **log_extra_dict, + "apps_available": exp.apps_count, + "apps_rate_limited": exp.rate_limited_count, + "apps_suspended": exp.suspended_count, + "countdown_seconds": retry_delay_seconds, + }, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.WAIT_TO_NOTIFY, + reason="retrying_because_app_is_rate_limited", + message="Unable to start notifications. Retrying again later.", extra=dict( - repoid=repoid, - commit=commitid, - apps_available=exp.apps_count, - apps_rate_limited=exp.rate_limited_count, - apps_suspended=exp.suspended_count, - countdown_seconds=retry_delay_seconds, + max_retries=10, + countdown=retry_delay_seconds, ), ) - return self._attempt_retry( - max_retries=10, - countdown=retry_delay_seconds, - current_yaml=commit_yaml, - commit=commit, - **kwargs, - ) # Maybe we have apps that are suspended. We can't communicate with github. log.warning( "We can't find an app to communicate with GitHub. Not notifying.", - extra=dict( - repoid=repoid, - commit=commitid, - apps_available=exp.apps_count, - apps_suspended=exp.suspended_count, - ), + extra={ + **log_extra_dict, + "apps_available": exp.apps_count, + "apps_suspended": exp.suspended_count, + }, + ) + UploadFlow.log(UploadFlow.NOTIF_NO_APP_INSTALLATION) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="no_valid_github_app_found", + message="We can't find an app to communicate with GitHub. Not notifying.", ) - self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) - return { - "notified": False, - "notifications": None, - "reason": "no_valid_github_app_found", - } try: ci_results = self.fetch_and_update_whether_ci_passed( @@ -383,33 +430,37 @@ def run_impl_within_lock( except TorngitClientError as ex: log.info( "Unable to fetch CI results due to a client problem. Not notifying user", - extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), + extra={ + **log_extra_dict, + "code": ex.code, + }, + ) + UploadFlow.log(UploadFlow.NOTIF_GIT_CLIENT_ERROR) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="not_able_fetch_ci_result", + message="Unable to fetch CI results due to a client problem. Not notifying user", ) - self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) - return { - "notified": False, - "notifications": None, - "reason": "not_able_fetch_ci_result", - } except TorngitServerFailureError: log.info( "Unable to fetch CI results due to server issues. Not notifying user", - extra=dict(repoid=commit.repoid, commit=commit.commitid), + extra=log_extra_dict, + ) + UploadFlow.log(UploadFlow.NOTIF_GIT_SERVICE_ERROR) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="server_issues_ci_result", + message="Unable to fetch CI results due to a client problem. Not notifying user", ) - self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) - return { - "notified": False, - "notifications": None, - "reason": "server_issues_ci_result", - } # Check for wait_for_ci based on the CI results and reattempt if true - # should_wait_longer - wait_for_ci = read_yaml_field(commit_yaml, ("codecov", "notify", "wait_for_ci"), True) + wait_for_ci = read_yaml_field( + commit_yaml, ("codecov", "notify", "wait_for_ci"), True + ) if wait_for_ci and ci_results is None: log.info( - "Not sending notifications yet because we are waiting for CI to finish", - extra=dict(repoid=commit.repoid, commit=commit.commitid), + "Not sending notifications yet because we are waiting for CI to finish. Attempting retry", + extra=log_extra_dict, ) ghapp_default_installations = list( filter( @@ -434,16 +485,20 @@ def run_impl_within_lock( else: max_retries = 10 countdown = 15 * 2**self.request.retries - return self._attempt_retry( - max_retries=max_retries, - countdown=countdown, - current_yaml=commit_yaml, - commit=commit, - **kwargs, + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.WAIT_TO_NOTIFY, + reason="retrying_because_wait_for_ci", + message="Not sending notifications yet because we are waiting for CI to finish. Attempting retry", + extra=dict( + max_retries=max_retries, + countdown=countdown, + ), ) - # should_send_notifications require_ci_to_pass - require_ci_to_pass = read_yaml_field(commit_yaml, ("codecov", "require_ci_to_pass"), True) + # Check for require_ci_to_pass if ci_results is false + require_ci_to_pass = read_yaml_field( + commit_yaml, ("codecov", "require_ci_to_pass"), True + ) if require_ci_to_pass and ci_results is False: self.app.tasks[status_set_error_task_name].apply_async( args=None, @@ -452,51 +507,54 @@ def run_impl_within_lock( ), ) log.info( - "Not sending notifications because CI failed", - extra=dict(repoid=commit.repoid, commit=commit.commitid), + "Not sending notifications because CI failed", extra=log_extra_dict + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_require_ci_to_pass_yaml_setting_and_no_ci_results", + message="Not sending notifications because CI failed", ) - return False + return None - ##### Business logic + def business_logic_checks( + self, commit: Commit, log_extra_dict: dict, report_code: str | None = None + ): # Notifications should be off in case of local uploads, and report code wouldn't be null in that case if report_code is not None: log.info( "Not scheduling notify because it's a local upload", - extra=extra_dict, + extra=log_extra_dict, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_require_ci_to_pass_yaml_setting_and_no_ci_results", + message="Not scheduling notify because it's a local upload", ) - return ShouldCallNotifyResult.DO_NOT_NOTIFY # Some check on CI skipping from some regex? Idk what this is if regexp_ci_skip.search(commit.message or ""): commit.state = "skipped" + log.info( + "Not scheduling notify because regex wants to skip ci", + extra=log_extra_dict, + ) + return ShouldCallNotifyResponse( + notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + reason="has_has_regexp_ci_skip", + message="Not scheduling notify because regex wants to skip ci", + ) + return None - - # If it got here, it should notify - notifications_called = True - notify_kwargs = { - "repoid": repoid, - "commitid": commitid, - "current_yaml": commit_yaml.to_dict(), - } - notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) - task = self.app.tasks[notify_task_name].apply_async( - kwargs=notify_kwargs - ) - log.info( - "Scheduling notify task", - extra=dict( - repoid=repoid, - commit=commitid, - commit_yaml=commit_yaml.to_dict(), - processing_results=processing_results, - notify_task_id=task.id, - parent_task=self.request.parent_id, - ), - ) + def orchestrator_side_effects( + self, + db_session: Session, + commit: Commit, + ): if commit.pullid: + repoid = commit.repoid pull = ( db_session.query(Pull) - .filter_by(repoid=commit.repoid, pullid=commit.pullid) + .filter_by(repoid=repoid, pullid=commit.pullid) .first() ) if pull: @@ -518,727 +576,56 @@ def run_impl_within_lock( db_session, compared_to, commit ) db_session.commit() - self.app.tasks[ - compute_comparison_task_name - ].apply_async( + self.app.tasks[compute_comparison_task_name].apply_async( kwargs=dict(comparison_id=comparison.id) ) - - # # ASK: this installation/bot related logic impedes notification if unavailable, but it - # # seems correct for it to be part of the notifier task, thoughts? - # try: - # installation_name_to_use = get_installation_name_for_owner_for_task( - # self.name, commit.repository.owner - # ) - # repository_service = get_repo_provider_service_for_specific_commit( - # commit, installation_name_to_use - # ) - # except RepositoryWithoutValidBotError: - # save_commit_error( - # commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value - # ) - - # log.warning( - # "Unable to start notifications because repo doesn't have a valid bot", - # extra=dict(repoid=repoid, commit=commitid), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) - # return {"notified": False, "notifications": None, "reason": "no_valid_bot"} - # except NoConfiguredAppsAvailable as exp: - # if exp.rate_limited_count > 0: - # # There's at least 1 app that we can use to communicate with GitHub, - # # but this app happens to be rate limited now. We try again later. - # # Min wait time of 1 minute - # retry_delay_seconds = max(60, get_seconds_to_next_hour()) - # log.warning( - # "Unable to start notifications. Retrying again later.", - # extra=dict( - # repoid=repoid, - # commit=commitid, - # apps_available=exp.apps_count, - # apps_rate_limited=exp.rate_limited_count, - # apps_suspended=exp.suspended_count, - # countdown_seconds=retry_delay_seconds, - # ), - # ) - # return self._attempt_retry( - # max_retries=10, - # countdown=retry_delay_seconds, - # current_yaml=current_yaml, - # commit=commit, - # **kwargs, - # ) - # # Maybe we have apps that are suspended. We can't communicate with github. - # log.warning( - # "We can't find an app to communicate with GitHub. Not notifying.", - # extra=dict( - # repoid=repoid, - # commit=commitid, - # apps_available=exp.apps_count, - # apps_suspended=exp.suspended_count, - # ), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) - # return { - # "notified": False, - # "notifications": None, - # "reason": "no_valid_github_app_found", - # } - - # if current_yaml is None: - # current_yaml = async_to_sync(get_current_yaml)(commit, repository_service) - # else: - # current_yaml = UserYaml.from_dict(current_yaml) - - # try: - # ci_results = self.fetch_and_update_whether_ci_passed( - # repository_service, commit, current_yaml - # ) - # except TorngitClientError as ex: - # log.info( - # "Unable to fetch CI results due to a client problem. Not notifying user", - # extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) - # return { - # "notified": False, - # "notifications": None, - # "reason": "not_able_fetch_ci_result", - # } - # except TorngitServerFailureError: - # log.info( - # "Unable to fetch CI results due to server issues. Not notifying user", - # extra=dict(repoid=commit.repoid, commit=commit.commitid), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) - # return { - # "notified": False, - # "notifications": None, - # "reason": "server_issues_ci_result", - # } - - # Should not notify based on type of upload (empty upload) - # Should not notify based on pending notifications - - - - - if not notifications_called: - UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) - - - - - - ############## From processing - - log.debug("In finish_reports_processing for commit: %s" % commit) - commitid = commit.commitid - repoid = commit.repoid - - # always notify, let the notify handle if it should submit - notifications_called = False - if not regexp_ci_skip.search(commit.message or ""): - match self.should_call_notifications( - commit, commit_yaml, processing_results, report_code - ): - case ShouldCallNotifyResult.NOTIFY: - notifications_called = True - notify_kwargs = { - "repoid": repoid, - "commitid": commitid, - "current_yaml": commit_yaml.to_dict(), - } - notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) - task = self.app.tasks[notify_task_name].apply_async( - kwargs=notify_kwargs - ) - log.info( - "Scheduling notify task", - extra=dict( - repoid=repoid, - commit=commitid, - commit_yaml=commit_yaml.to_dict(), - processing_results=processing_results, - notify_task_id=task.id, - parent_task=self.request.parent_id, - ), - ) - if commit.pullid: - pull = ( - db_session.query(Pull) - .filter_by(repoid=commit.repoid, pullid=commit.pullid) - .first() - ) - if pull: - head = pull.get_head_commit() - if head is None or head.timestamp <= commit.timestamp: - pull.head = commit.commitid - if pull.head == commit.commitid: - db_session.commit() - self.app.tasks[pulls_task_name].apply_async( - kwargs=dict( - repoid=repoid, - pullid=pull.pullid, - should_send_notifications=False, - ) - ) - compared_to = pull.get_comparedto_commit() - if compared_to: - comparison = get_or_create_comparison( - db_session, compared_to, commit - ) - db_session.commit() - self.app.tasks[ - compute_comparison_task_name - ].apply_async( - kwargs=dict(comparison_id=comparison.id) - ) - case ShouldCallNotifyResult.DO_NOT_NOTIFY: - notifications_called = False - log.info( - "Skipping notify task", - extra=dict( - repoid=repoid, - commit=commitid, - commit_yaml=commit_yaml.to_dict(), - processing_results=processing_results, - parent_task=self.request.parent_id, - ), - ) - case ShouldCallNotifyResult.NOTIFY_ERROR: - notifications_called = False - notify_error_kwargs = { - "repoid": repoid, - "commitid": commitid, - "current_yaml": commit_yaml.to_dict(), - } - notify_error_kwargs = UploadFlow.save_to_kwargs(notify_error_kwargs) - task = self.app.tasks[notify_error_task_name].apply_async( - kwargs=notify_error_kwargs - ) - else: - commit.state = "skipped" - - UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) - if not notifications_called: - UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) - - return {"notifications_called": notifications_called} - # except LockError: - # log.warning("Unable to acquire lock", extra=dict(lock_name=lock_name)) - # UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) - - ############## From processing - - ######### Possibly move this logic to the orchestrator task as a requirement - try: - installation_name_to_use = get_installation_name_for_owner_for_task( - self.name, commit.repository.owner - ) - repository_service = get_repo_provider_service_for_specific_commit( - commit, installation_name_to_use - ) - except RepositoryWithoutValidBotError: - save_commit_error( - commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value - ) - - log.warning( - "Unable to start notifications because repo doesn't have a valid bot", - extra=dict(repoid=repoid, commit=commitid), - ) - self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) - return {"notified": False, "notifications": None, "reason": "no_valid_bot"} - except NoConfiguredAppsAvailable as exp: - if exp.rate_limited_count > 0: - # There's at least 1 app that we can use to communicate with GitHub, - # but this app happens to be rate limited now. We try again later. - # Min wait time of 1 minute - retry_delay_seconds = max(60, get_seconds_to_next_hour()) - log.warning( - "Unable to start notifications. Retrying again later.", - extra=dict( - repoid=repoid, - commit=commitid, - apps_available=exp.apps_count, - apps_rate_limited=exp.rate_limited_count, - apps_suspended=exp.suspended_count, - countdown_seconds=retry_delay_seconds, - ), - ) - return self._attempt_retry( - max_retries=10, - countdown=retry_delay_seconds, - current_yaml=current_yaml, - commit=commit, - **kwargs, - ) - # Maybe we have apps that are suspended. We can't communicate with github. - log.warning( - "We can't find an app to communicate with GitHub. Not notifying.", - extra=dict( - repoid=repoid, - commit=commitid, - apps_available=exp.apps_count, - apps_suspended=exp.suspended_count, - ), - ) - self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) - return { - "notified": False, - "notifications": None, - "reason": "no_valid_github_app_found", - } - - if current_yaml is None: - current_yaml = async_to_sync(get_current_yaml)(commit, repository_service) - else: - current_yaml = UserYaml.from_dict(current_yaml) - - try: - ci_results = self.fetch_and_update_whether_ci_passed( - repository_service, commit, current_yaml - ) - except TorngitClientError as ex: - log.info( - "Unable to fetch CI results due to a client problem. Not notifying user", - extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), - ) - self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) - return { - "notified": False, - "notifications": None, - "reason": "not_able_fetch_ci_result", - } - except TorngitServerFailureError: - log.info( - "Unable to fetch CI results due to server issues. Not notifying user", - extra=dict(repoid=commit.repoid, commit=commit.commitid), - ) - self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) - return { - "notified": False, - "notifications": None, - "reason": "server_issues_ci_result", - } - if self.should_wait_longer(current_yaml, commit, ci_results): - log.info( - "Not sending notifications yet because we are waiting for CI to finish", - extra=dict(repoid=commit.repoid, commit=commit.commitid), - ) - ghapp_default_installations = list( - filter( - lambda obj: obj.name == installation_name_to_use - and obj.is_configured(), - commit.repository.owner.github_app_installations or [], - ) - ) - rely_on_webhook_ghapp = ghapp_default_installations != [] and any( - obj.is_repo_covered_by_integration(commit.repository) - for obj in ghapp_default_installations - ) - rely_on_webhook_legacy = commit.repository.using_integration - if ( - rely_on_webhook_ghapp - or rely_on_webhook_legacy - or commit.repository.hookid - ): - # rely on the webhook, but still retry in case we miss the webhook - max_retries = 5 - countdown = (60 * 3) * 2**self.request.retries - else: - max_retries = 10 - countdown = 15 * 2**self.request.retries - return self._attempt_retry( - max_retries=max_retries, - countdown=countdown, - current_yaml=current_yaml, - commit=commit, - **kwargs, - ) - - report_service = ReportService( - current_yaml, gh_app_installation_name=installation_name_to_use - ) - head_report = report_service.get_existing_report_for_commit( - commit, report_class=ReadOnlyReport - ) - if self.should_send_notifications( - current_yaml, commit, ci_results, head_report - ): - enriched_pull = async_to_sync( - fetch_and_update_pull_request_information_from_commit - )(repository_service, commit, current_yaml) - if enriched_pull and enriched_pull.database_pull: - pull = enriched_pull.database_pull - base_commit = self.fetch_pull_request_base(pull) - else: - pull = None - base_commit = self.fetch_parent(commit) - - if ( - enriched_pull - and not self.send_notifications_if_commit_differs_from_pulls_head( - commit, enriched_pull, current_yaml - ) - and empty_upload is None - ): - log.info( - "Not sending notifications for commit when it differs from pull's most recent head", - extra=dict( - commit=commit.commitid, - repoid=commit.repoid, - current_yaml=current_yaml.to_dict(), - pull_head=enriched_pull.provider_pull["head"]["commitid"], - ), - ) - self.log_checkpoint(UploadFlow.NOTIF_STALE_HEAD) - return { - "notified": False, - "notifications": None, - "reason": "User doesnt want notifications warning them that current head differs from pull request most recent head.", - } - - if base_commit is not None: - base_report = report_service.get_existing_report_for_commit( - base_commit, report_class=ReadOnlyReport - ) - else: - base_report = None - if head_report is None and empty_upload is None: - self.log_checkpoint(UploadFlow.NOTIF_ERROR_NO_REPORT) - return { - "notified": False, - "notifications": None, - "reason": "no_head_report", - } - - ######### Possibly move this logic to the orchestrator task as a requirement - - - - # log.info("Starting notifications", extra=dict(commit=commitid, repoid=repoid)) - # commits_query = db_session.query(Commit).filter( - # Commit.repoid == repoid, Commit.commitid == commitid - # ) - # commit: Commit = commits_query.first() - # assert commit, "Commit not found in database." - - # test_result_commit_report = commit.commit_report(ReportType.TEST_RESULTS) - # if ( - # test_result_commit_report is not None - # and test_result_commit_report.test_result_totals is not None - # and not test_result_commit_report.test_result_totals.error - # and test_result_commit_report.test_result_totals.failed > 0 - # ): - # return { - # "notify_attempted": False, - # "notifications": None, - # "reason": "test_failures", - # } - - # try: - # installation_name_to_use = get_installation_name_for_owner_for_task( - # self.name, commit.repository.owner - # ) - # repository_service = get_repo_provider_service_for_specific_commit( - # commit, installation_name_to_use - # ) - # except RepositoryWithoutValidBotError: - # save_commit_error( - # commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value - # ) - - # log.warning( - # "Unable to start notifications because repo doesn't have a valid bot", - # extra=dict(repoid=repoid, commit=commitid), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) - # return {"notified": False, "notifications": None, "reason": "no_valid_bot"} - # except NoConfiguredAppsAvailable as exp: - # if exp.rate_limited_count > 0: - # # There's at least 1 app that we can use to communicate with GitHub, - # # but this app happens to be rate limited now. We try again later. - # # Min wait time of 1 minute - # retry_delay_seconds = max(60, get_seconds_to_next_hour()) - # log.warning( - # "Unable to start notifications. Retrying again later.", - # extra=dict( - # repoid=repoid, - # commit=commitid, - # apps_available=exp.apps_count, - # apps_rate_limited=exp.rate_limited_count, - # apps_suspended=exp.suspended_count, - # countdown_seconds=retry_delay_seconds, - # ), - # ) - # return self._attempt_retry( - # max_retries=10, - # countdown=retry_delay_seconds, - # current_yaml=current_yaml, - # commit=commit, - # **kwargs, - # ) - # # Maybe we have apps that are suspended. We can't communicate with github. - # log.warning( - # "We can't find an app to communicate with GitHub. Not notifying.", - # extra=dict( - # repoid=repoid, - # commit=commitid, - # apps_available=exp.apps_count, - # apps_suspended=exp.suspended_count, - # ), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) - # return { - # "notified": False, - # "notifications": None, - # "reason": "no_valid_github_app_found", - # } - - # if current_yaml is None: - # current_yaml = async_to_sync(get_current_yaml)(commit, repository_service) - # else: - # current_yaml = UserYaml.from_dict(current_yaml) - - # try: - # ci_results = self.fetch_and_update_whether_ci_passed( - # repository_service, commit, current_yaml - # ) - # except TorngitClientError as ex: - # log.info( - # "Unable to fetch CI results due to a client problem. Not notifying user", - # extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) - # return { - # "notified": False, - # "notifications": None, - # "reason": "not_able_fetch_ci_result", - # } - # except TorngitServerFailureError: - # log.info( - # "Unable to fetch CI results due to server issues. Not notifying user", - # extra=dict(repoid=commit.repoid, commit=commit.commitid), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) - # return { - # "notified": False, - # "notifications": None, - # "reason": "server_issues_ci_result", - # } - # if self.should_wait_longer(current_yaml, commit, ci_results): - # log.info( - # "Not sending notifications yet because we are waiting for CI to finish", - # extra=dict(repoid=commit.repoid, commit=commit.commitid), - # ) - # ghapp_default_installations = list( - # filter( - # lambda obj: obj.name == installation_name_to_use - # and obj.is_configured(), - # commit.repository.owner.github_app_installations or [], - # ) - # ) - # rely_on_webhook_ghapp = ghapp_default_installations != [] and any( - # obj.is_repo_covered_by_integration(commit.repository) - # for obj in ghapp_default_installations - # ) - # rely_on_webhook_legacy = commit.repository.using_integration - # if ( - # rely_on_webhook_ghapp - # or rely_on_webhook_legacy - # or commit.repository.hookid - # ): - # # rely on the webhook, but still retry in case we miss the webhook - # max_retries = 5 - # countdown = (60 * 3) * 2**self.request.retries - # else: - # max_retries = 10 - # countdown = 15 * 2**self.request.retries - # return self._attempt_retry( - # max_retries=max_retries, - # countdown=countdown, - # current_yaml=current_yaml, - # commit=commit, - # **kwargs, - # ) - - # report_service = ReportService( - # current_yaml, gh_app_installation_name=installation_name_to_use - # ) - # head_report = report_service.get_existing_report_for_commit( - # commit, report_class=ReadOnlyReport - # ) - # if self.should_send_notifications( - # current_yaml, commit, ci_results, head_report - # ): - # enriched_pull = async_to_sync( - # fetch_and_update_pull_request_information_from_commit - # )(repository_service, commit, current_yaml) - # if enriched_pull and enriched_pull.database_pull: - # pull = enriched_pull.database_pull - # base_commit = self.fetch_pull_request_base(pull) - # else: - # pull = None - # base_commit = self.fetch_parent(commit) - - # if ( - # enriched_pull - # and not self.send_notifications_if_commit_differs_from_pulls_head( - # commit, enriched_pull, current_yaml - # ) - # and empty_upload is None - # ): - # log.info( - # "Not sending notifications for commit when it differs from pull's most recent head", - # extra=dict( - # commit=commit.commitid, - # repoid=commit.repoid, - # current_yaml=current_yaml.to_dict(), - # pull_head=enriched_pull.provider_pull["head"]["commitid"], - # ), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_STALE_HEAD) - # return { - # "notified": False, - # "notifications": None, - # "reason": "User doesnt want notifications warning them that current head differs from pull request most recent head.", - # } - - # if base_commit is not None: - # base_report = report_service.get_existing_report_for_commit( - # base_commit, report_class=ReadOnlyReport - # ) - # else: - # base_report = None - # if head_report is None and empty_upload is None: - # self.log_checkpoint(UploadFlow.NOTIF_ERROR_NO_REPORT) - # return { - # "notified": False, - # "notifications": None, - # "reason": "no_head_report", - # } - - # if commit.repository.service == "gitlab": - # gitlab_extra_shas_to_notify = self.get_gitlab_extra_shas_to_notify( - # commit, repository_service - # ) - # else: - # gitlab_extra_shas_to_notify = None - - # log.info( - # "We are going to be sending notifications", - # extra=dict( - # commit=commit.commitid, - # repoid=commit.repoid, - # current_yaml=current_yaml.to_dict(), - # ), - # ) - # notifications = self.submit_third_party_notifications( - # current_yaml, - # base_commit, - # commit, - # base_report, - # head_report, - # enriched_pull, - # repository_service, - # empty_upload, - # all_tests_passed=( - # test_result_commit_report is not None - # and test_result_commit_report.test_result_totals is not None - # and test_result_commit_report.test_result_totals.error is None - # and test_result_commit_report.test_result_totals.failed == 0 - # ), - # test_results_error=( - # test_result_commit_report is not None - # and test_result_commit_report.test_result_totals is not None - # and test_result_commit_report.test_result_totals.error - # ), - # installation_name_to_use=installation_name_to_use, - # gh_is_using_codecov_commenter=self.is_using_codecov_commenter( - # repository_service - # ), - # gitlab_extra_shas_to_notify=gitlab_extra_shas_to_notify, - # ) - # self.log_checkpoint(UploadFlow.NOTIFIED) - # log.info( - # "Notifications done", - # extra=dict( - # notifications=notifications, - # notification_count=len(notifications), - # commit=commit.commitid, - # repoid=commit.repoid, - # pullid=pull.pullid if pull is not None else None, - # ), - # ) - # db_session.commit() - # return {"notified": True, "notifications": notifications} - # else: - # log.info( - # "Not sending notifications at all", - # extra=dict(commit=commit.commitid, repoid=commit.repoid), - # ) - # self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) - # return {"notified": False, "notifications": None} - - - - - # def log_checkpoint(self, checkpoint): - # """ - # Only log a checkpoint if whoever scheduled us sent checkpoints data from - # the same flow. - - # The notify task is an important part of `UploadFlow`, but it's also used - # elsewhere. If this instance of the notify task wasn't scheduled as part - # of upload processing, attempting to log `UploadFlow` checkpoints for it - # will pollute our metrics. - # """ - # if UploadFlow.has_begun(): - # UploadFlow.log(checkpoint) - - # Notify task signature - # notify_kwargs = { - # "repoid": repoid, - # "commitid": commitid, - # "current_yaml": commit_yaml.to_dict(), - # } - # notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) - # task = self.app.tasks[notify_task_name].apply_async( - # kwargs=notify_kwargs - # ) - - def _attempt_retry( - self, - max_retries: int, - countdown: int, - commit: Commit, - current_yaml: Optional[UserYaml], - *args, - **kwargs, + self, max_retries: int, countdown: int, log_extra_dict: dict ) -> None: try: self.retry(max_retries=max_retries, countdown=countdown) except MaxRetriesExceededError: log.warning( "Not attempting to retry notifications since we already retried too many times", - extra=dict( - repoid=commit.repoid, - commit=commit.commitid, - max_retries=max_retries, - next_countdown_would_be=countdown, - current_yaml=current_yaml.to_dict(), - ), + extra={ + **log_extra_dict, + "max_retries": max_retries, + "next_countdown_would_be": countdown, + }, ) - self.log_checkpoint(UploadFlow.NOTIF_TOO_MANY_RETRIES) + UploadFlow.log(UploadFlow.NOTIF_TOO_MANY_RETRIES) return { - "notified": False, - "notifications": None, + "notification_result": ShouldCallNotifyResult.DO_NOT_NOTIFY, "reason": "too_many_retries", + "message": "Not attempting to retry notifications since we already retried too many times", } -RegisteredNotificationOrchestratorTask = celery_app.register_task(NotificationOrchestratorTask()) -notification_orchestrator_task = celery_app.tasks[RegisteredNotificationOrchestratorTask.name] \ No newline at end of file + def has_upcoming_notifies_according_to_redis( + self, redis_connection: Redis, repoid: int, commitid: str + ) -> bool: + """Checks whether there are any jobs processing according to Redis right now and, + therefore, whether more up-to-date notifications will come after this anyway + + It's very important to have this code be conservative against saying + there are upcoming notifies already. The point of this code is to + avoid extra notifications for efficiency purposes, but it is better + to send extra notifications than to lack notifications + + Args: + redis_connection (Redis): The redis connection we check against + repoid (int): The repoid of the commit + commitid (str): The commitid of the commit + """ + upload_processing_lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) + if redis_connection.get(upload_processing_lock_name): + return True + return False + + +RegisteredNotificationOrchestratorTask = celery_app.register_task( + NotificationOrchestratorTask() +) +notification_orchestrator_task = celery_app.tasks[ + RegisteredNotificationOrchestratorTask.name +] diff --git a/tasks/notify.py b/tasks/notify.py index 55da99eb4..419983e46 100644 --- a/tasks/notify.py +++ b/tasks/notify.py @@ -43,7 +43,6 @@ from services.github import get_github_app_for_commit, set_github_app_for_commit from services.lock_manager import LockManager, LockRetry, LockType from services.notification import NotificationService -from services.redis import Redis, get_redis_connection from services.report import ReportService from services.repository import ( EnrichedPull, @@ -53,7 +52,6 @@ ) from services.yaml import get_current_yaml, read_yaml_field from tasks.base import BaseCodecovTask -from tasks.upload_processor import UPLOAD_PROCESSING_LOCK_NAME log = logging.getLogger(__name__) @@ -71,21 +69,6 @@ def run_impl( empty_upload=None, **kwargs, ): - redis_connection = get_redis_connection() - if self.has_upcoming_notifies_according_to_redis( - redis_connection, repoid, commitid - ): - log.info( - "Not notifying because there are seemingly other jobs being processed yet", - extra=dict(repoid=repoid, commitid=commitid), - ) - self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) - return { - "notified": False, - "notifications": None, - "reason": "has_other_notifies_coming", - } - lock_manager = LockManager( repoid=repoid, commitid=commitid, @@ -199,7 +182,7 @@ def run_impl_within_lock( } # ASK: this installation/bot related logic impedes notification if unavailable, but it - # seems correct for it to be part of the notifier task, thoughts? + # seems correct for it to be part of the notifier task, thoughts? try: installation_name_to_use = get_installation_name_for_owner_for_task( self.name, commit.repository.owner @@ -218,91 +201,11 @@ def run_impl_within_lock( self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) return {"notified": False, "notifications": None, "reason": "no_valid_bot"} - # except NoConfiguredAppsAvailable as exp: - # # Maybe we have apps that are suspended. We can't communicate with github. - # log.warning( - # "We can't find an app to communicate with GitHub. Not notifying.", - # extra=dict( - # repoid=repoid, - # commit=commitid, - # apps_available=exp.apps_count, - # apps_suspended=exp.suspended_count, - # ), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) - # return { - # "notified": False, - # "notifications": None, - # "reason": "no_valid_github_app_found", - # } - if current_yaml is None: current_yaml = async_to_sync(get_current_yaml)(commit, repository_service) else: current_yaml = UserYaml.from_dict(current_yaml) - # try: - # ci_results = self.fetch_and_update_whether_ci_passed( - # repository_service, commit, current_yaml - # ) - # except TorngitClientError as ex: - # log.info( - # "Unable to fetch CI results due to a client problem. Not notifying user", - # extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) - # return { - # "notified": False, - # "notifications": None, - # "reason": "not_able_fetch_ci_result", - # } - # except TorngitServerFailureError: - # log.info( - # "Unable to fetch CI results due to server issues. Not notifying user", - # extra=dict(repoid=commit.repoid, commit=commit.commitid), - # ) - # self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) - # return { - # "notified": False, - # "notifications": None, - # "reason": "server_issues_ci_result", - # } - # if self.should_wait_longer(current_yaml, commit, ci_results): - # log.info( - # "Not sending notifications yet because we are waiting for CI to finish", - # extra=dict(repoid=commit.repoid, commit=commit.commitid), - # ) - # ghapp_default_installations = list( - # filter( - # lambda obj: obj.name == installation_name_to_use - # and obj.is_configured(), - # commit.repository.owner.github_app_installations or [], - # ) - # ) - # rely_on_webhook_ghapp = ghapp_default_installations != [] and any( - # obj.is_repo_covered_by_integration(commit.repository) - # for obj in ghapp_default_installations - # ) - # rely_on_webhook_legacy = commit.repository.using_integration - # if ( - # rely_on_webhook_ghapp - # or rely_on_webhook_legacy - # or commit.repository.hookid - # ): - # # rely on the webhook, but still retry in case we miss the webhook - # max_retries = 5 - # countdown = (60 * 3) * 2**self.request.retries - # else: - # max_retries = 10 - # countdown = 15 * 2**self.request.retries - # return self._attempt_retry( - # max_retries=max_retries, - # countdown=countdown, - # current_yaml=current_yaml, - # commit=commit, - # **kwargs, - # ) - report_service = ReportService( current_yaml, gh_app_installation_name=installation_name_to_use ) @@ -414,13 +317,6 @@ def run_impl_within_lock( ) db_session.commit() return {"notified": True, "notifications": notifications} - # else: - # log.info( - # "Not sending notifications at all", - # extra=dict(commit=commit.commitid, repoid=commit.repoid), - # ) - # self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) - # return {"notified": False, "notifications": None} def is_using_codecov_commenter( self, repository_service: TorngitBaseAdapter @@ -438,27 +334,6 @@ def is_using_codecov_commenter( == commenter_bot_token ) - def has_upcoming_notifies_according_to_redis( - self, redis_connection: Redis, repoid: int, commitid: str - ) -> bool: - """Checks whether there are any jobs processing according to Redis right now and, - therefore, whether more up-to-date notifications will come after this anyway - - It's very important to have this code be conservative against saying - there are upcoming notifies already. The point of this code is to - avoid extra notifications for efficiency purposes, but it is better - to send extra notifications than to lack notifications - - Args: - redis_connection (Redis): The redis connection we check against - repoid (int): The repoid of the commit - commitid (str): The commitid of the commit - """ - upload_processing_lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) - if redis_connection.get(upload_processing_lock_name): - return True - return False - @sentry_sdk.trace def save_patch_totals(self, comparison: ComparisonProxy) -> None: """Saves patch coverage to the CompareCommit, if it exists. diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 35d12312f..7f1404060 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -40,7 +40,6 @@ log = logging.getLogger(__name__) - class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): """This is the third task of the series of tasks designed to process an `upload` made by the user @@ -131,9 +130,11 @@ def run_impl( "commitid": commitid, "commit_yaml": commit_yaml.to_dict(), "processing_results": processing_results, - "report_code": report_code + "report_code": report_code, } - notification_orchestrator_kwargs = UploadFlow.save_to_kwargs(notification_orchestrator_kwargs) + notification_orchestrator_kwargs = UploadFlow.save_to_kwargs( + notification_orchestrator_kwargs + ) # TODO: add log to add the notification orchestrator task self.app.tasks[notification_orchestrator_task_name].apply_async( kwargs=notification_orchestrator_kwargs @@ -172,6 +173,7 @@ def invalidate_caches(self, redis_connection, commit: Commit): if commit.branch == repository.branch: redis_connection.hdel("badge", ("%s:" % key).lower()) + def get_report_lock(repoid: int, commitid: str, hard_time_limit: int) -> Lock: lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) redis_connection = get_redis_connection() @@ -246,7 +248,6 @@ def load_commit_diff(commit: Commit, task_name: str | None = None) -> dict | Non commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value, ) - log.warning( "Could not apply diff to report because there is no valid bot found for that repo", exc_info=True, @@ -254,5 +255,6 @@ def load_commit_diff(commit: Commit, task_name: str | None = None) -> dict | Non return None + RegisteredUploadTask = celery_app.register_task(UploadFinisherTask()) -upload_finisher_task = celery_app.tasks[RegisteredUploadTask.name] \ No newline at end of file +upload_finisher_task = celery_app.tasks[RegisteredUploadTask.name] From e5dda5ae52b1248e7971f17d2f00ef87c1720d6c Mon Sep 17 00:00:00 2001 From: Adrian Date: Mon, 6 Jan 2025 17:17:48 -0600 Subject: [PATCH 3/3] adjust logic --- tasks/notification_orchestrator.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tasks/notification_orchestrator.py b/tasks/notification_orchestrator.py index 24cab294c..b6eb00c64 100644 --- a/tasks/notification_orchestrator.py +++ b/tasks/notification_orchestrator.py @@ -10,7 +10,6 @@ notification_orchestrator_task_name, notify_task_name, pulls_task_name, - status_set_error_task_name, ) from shared.torngit.exceptions import TorngitClientError, TorngitServerFailureError from shared.yaml import UserYaml @@ -500,17 +499,11 @@ def yaml_checks( commit_yaml, ("codecov", "require_ci_to_pass"), True ) if require_ci_to_pass and ci_results is False: - self.app.tasks[status_set_error_task_name].apply_async( - args=None, - kwargs=dict( - repoid=commit.repoid, commitid=commit.commitid, message="CI failed." - ), - ) log.info( "Not sending notifications because CI failed", extra=log_extra_dict ) return ShouldCallNotifyResponse( - notification_result=ShouldCallNotifyResult.DO_NOT_NOTIFY, + notification_result=ShouldCallNotifyResult.NOTIFY_ERROR, reason="has_require_ci_to_pass_yaml_setting_and_no_ci_results", message="Not sending notifications because CI failed", )