|
| 1 | +from django.conf import settings |
1 | 2 | from django.contrib.auth.models import AnonymousUser
|
2 | 3 |
|
3 |
| -from codecov.commands.exceptions import MissingService |
| 4 | +import services.self_hosted as self_hosted |
| 5 | +from codecov.commands.exceptions import ( |
| 6 | + MissingService, |
| 7 | + Unauthenticated, |
| 8 | + Unauthorized, |
| 9 | + ValidationError, |
| 10 | +) |
| 11 | +from codecov_auth.helpers import current_user_part_of_org |
4 | 12 | from codecov_auth.models import Owner, User
|
| 13 | +from core.models import Repository |
5 | 14 |
|
6 | 15 |
|
7 | 16 | class BaseCommand:
|
@@ -44,3 +53,59 @@ def __init__(self, current_owner: Owner, service: str, current_user: User = None
|
44 | 53 |
|
45 | 54 | if self.current_owner:
|
46 | 55 | self.current_user = self.current_owner.user
|
| 56 | + |
| 57 | + def ensure_is_admin(self, owner: Owner) -> None: |
| 58 | + """ |
| 59 | + Ensures that the `current_owner` is an admin of `owner`, |
| 60 | + or raise `Unauthorized` otherwise. |
| 61 | + """ |
| 62 | + |
| 63 | + if not current_user_part_of_org(self.current_owner, owner): |
| 64 | + raise Unauthorized() |
| 65 | + |
| 66 | + if settings.IS_ENTERPRISE: |
| 67 | + if not self_hosted.is_admin_owner(self.current_owner): |
| 68 | + raise Unauthorized() |
| 69 | + else: |
| 70 | + if not owner.is_admin(self.current_owner): |
| 71 | + raise Unauthorized() |
| 72 | + |
| 73 | + def resolve_owner_and_repo( |
| 74 | + self, |
| 75 | + owner_username: str, |
| 76 | + repo_name: str, |
| 77 | + ensure_is_admin: bool = False, |
| 78 | + only_viewable: bool = False, |
| 79 | + only_active: bool = False, |
| 80 | + ) -> tuple[Owner, Repository]: |
| 81 | + """ |
| 82 | + Resolves the `Owner` and `Repository` based on the passed `owner_username` |
| 83 | + and `repo_name` respectively. |
| 84 | +
|
| 85 | + If `ensure_is_admin` is set, this will also ensure that the `current_owner` is an |
| 86 | + admin on the resolved `Owner`. |
| 87 | + """ |
| 88 | + if ensure_is_admin and not self.current_user.is_authenticated: |
| 89 | + raise Unauthenticated() |
| 90 | + |
| 91 | + owner = Owner.objects.filter( |
| 92 | + service=self.service, username=owner_username |
| 93 | + ).first() |
| 94 | + |
| 95 | + if not owner: |
| 96 | + raise ValidationError("Owner not found") |
| 97 | + |
| 98 | + if ensure_is_admin: |
| 99 | + self.ensure_is_admin(owner) |
| 100 | + |
| 101 | + repo_query = Repository.objects |
| 102 | + if only_viewable: |
| 103 | + repo_query = repo_query.viewable_repos(self.current_owner) |
| 104 | + if only_active: |
| 105 | + repo_query = repo_query.filter(active=True) |
| 106 | + |
| 107 | + repo = repo_query.filter(author=owner, name=repo_name).first() |
| 108 | + if not repo: |
| 109 | + raise ValidationError("Repo not found") |
| 110 | + |
| 111 | + return (owner, repo) |
0 commit comments