Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions api/mixins.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Optional

from rest_framework.exceptions import APIException

from tigacrafting.models import IdentificationTask

class IdentificationTaskNestedAttribute():
Expand All @@ -10,8 +12,31 @@ def get_identification_task_obj(self) -> Optional[IdentificationTask]:
if task_id := self.kwargs.get(self.get_parent_lookup_url_kwarg(), None):
return IdentificationTask.objects.get(pk=task_id)

def get_permissions(self):
return [
permission(identification_task=self.get_identification_task_obj())
for permission in self.permission_classes
]
def check_parent_permissions(self, request) -> bool:
from .views import IdentificationTaskViewSet
if self.action in ['list', 'retrieve']:
parent_view = IdentificationTaskViewSet()
parent_view.action = 'retrieve'
parent_view.request = request
parent_view.kwargs = self.kwargs

identification_task_obj = self.get_identification_task_obj()

try:
parent_view.check_object_permissions(request, identification_task_obj)
except APIException:
pass
else:
return True
return False

def check_permissions(self, request):
if self.check_parent_permissions(request):
return
# If parent permissions are not checked or failed, check the current view's permissions
super().check_permissions(request)

def check_object_permissions(self, request, obj):
if self.check_parent_permissions(request):
return
super().check_object_permissions(request, obj)
81 changes: 67 additions & 14 deletions api/permissions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import Optional

from django.contrib.auth import get_user_model
from django.core.exceptions import MultipleObjectsReturned

from rest_framework import permissions

from tigacrafting.models import IdentificationTask, ExpertReportAnnotation
from tigacrafting.models import IdentificationTask, ExpertReportAnnotation, UserStat
from tigaserver_app.models import TigaUser, Notification

from .utils import get_fk_fieldnames
Expand Down Expand Up @@ -137,6 +135,51 @@ def has_permission(self, request, view):
return super().has_permission(request=request, view=view)
return False

class UserRolePermission(permissions.BasePermission):
ACTION_TO_PERMISSION = {
'retrieve': 'view',
'list': 'view',
'update': 'change',
'create': 'add',
'destroy': 'delete'
}

def check_permissions(self, user, action, obj_or_klass) -> bool:
if isinstance(user, User):
user = UserStat.objects.filter(user=user).first()
if not user:
return False

return user.has_role_permission(
action=action,
obj_or_klass=obj_or_klass
) if action is not None else False

def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False

if not view.action:
return False

return self.check_permissions(
user=request.user,
action=self.ACTION_TO_PERMISSION.get(view.action),
obj_or_klass=view.get_queryset().model
)

def has_object_permission(self, request, view, obj):
if not request.user or not request.user.is_authenticated:
return False

if not view.action:
return False

return self.check_permissions(
user=request.user,
action=self.ACTION_TO_PERMISSION.get(view.action),
obj_or_klass=obj
)

class BaseIdentificationTaskPermissions(FullDjangoModelPermissions):
def _check_is_annotator(self, request, view, obj) -> bool:
Expand All @@ -160,6 +203,8 @@ def _check_is_annotator(self, request, view, obj) -> bool:
return task.annotators.filter(pk=request.user.pk).exists()

def has_permission(self, request, view):
if isinstance(request.user, TigaUser):
return False
if request.user and request.user.is_authenticated:
if view.action == 'retrieve':
return True
Expand All @@ -176,7 +221,19 @@ def has_object_permission(self, request, view, obj):
return request.user.has_perms(perms)

class IdentificationTaskPermissions(BaseIdentificationTaskPermissions):
pass
def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False

role_perm = False
if view.action == 'list':
role_perm = UserRolePermission().check_permissions(
user=request.user,
action='add',
obj_or_klass=ExpertReportAnnotation
)

return super().has_permission(request, view) or role_perm

class MyIdentificationTaskPermissions(DjangoRegularUserModelPermissions):
pass
Expand All @@ -189,20 +246,16 @@ def has_permission(self, request, view):
return request.user.has_perm('%(app_label)s.add_%(model_name)s' % {
'app_label': ExpertReportAnnotation._meta.app_label,
'model_name': ExpertReportAnnotation._meta.model_name
})
}) or UserRolePermission().check_permissions(
user=request.user,
action='add',
obj_or_klass=ExpertReportAnnotation
)

class BaseIdentificationTaskAttributePermissions(BaseIdentificationTaskPermissions):
def __init__(self, identification_task, *args, **kwargs):
self.identification_task = identification_task
super().__init__(*args, **kwargs)

def has_permission(self, request, view):
if view.action == 'list' and self.identification_task:
if self._check_is_annotator(request, view, obj=self.identification_task):
return True
return super().has_permission(request, view)
pass


class AnnotationPermissions(BaseIdentificationTaskAttributePermissions):
# Always allow retrieve owned attributes
def has_object_permission(self, request, view, obj):
Expand Down
103 changes: 101 additions & 2 deletions api/serializers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import abstractmethod
from datetime import datetime
from typing import Literal, Optional
from typing import Literal, Optional, Union
from uuid import UUID

from django.contrib.auth import get_user_model
Expand All @@ -11,6 +12,7 @@
from rest_framework import serializers

from drf_extra_fields.geo_fields import PointField
from rest_framework_dataclasses.serializers import DataclassSerializer
from taggit.serializers import TaggitSerializer

from tigacrafting.models import (
Expand All @@ -21,6 +23,7 @@
PhotoPrediction,
FavoritedReports
)
from tigacrafting.permissions import Permissions, Role
from tigaserver_app.models import (
NotificationContent,
Notification,
Expand Down Expand Up @@ -102,6 +105,88 @@ class Meta:
"received_at": {"source": "server_upload_time"},
}

class PermissionsSerializer(DataclassSerializer):
class Meta:
dataclass = Permissions

class BaseRolePermissionSerializer(serializers.Serializer):
role = serializers.SerializerMethodField()
permissions = serializers.SerializerMethodField()

@abstractmethod
def _get_role(self, obj: Union[User, TigaUser]) -> Role:
raise NotImplementedError

def get_role(self, obj: Union[User, TigaUser]) -> Role:
if isinstance(obj, User):
try:
obj = obj.userstat
except UserStat.DoesNotExist:
obj = None

if not obj:
obj = TigaUser()
return self._get_role(obj=obj)

@extend_schema_field(PermissionsSerializer)
def get_permissions(self, obj: Union[User, TigaUser]):
if isinstance(obj, User):
try:
obj = obj.userstat
except UserStat.DoesNotExist:
obj = None

if not obj:
obj = TigaUser()

permissions = obj.get_role_permissions(
role=self.get_role(obj=obj)
)
return PermissionsSerializer(permissions).data

class UserPermissionSerializer(serializers.Serializer):
class GeneralPermissionSerializer(BaseRolePermissionSerializer):
is_staff = serializers.BooleanField()

def _get_role(self, obj: Union[User, TigaUser]) -> Role:
return obj.get_role()

class CountryPermissionSerializer(BaseRolePermissionSerializer):
def __init__(self, *args, **kwargs):
self.country = kwargs.pop('country', None)
super().__init__(*args, **kwargs)

country = serializers.SerializerMethodField()

def _get_role(self, obj: Union[User, TigaUser]) -> Role:
return obj.get_role(country=self.country)

@extend_schema_field(CountrySerializer)
def get_country(self, obj: Union[User, TigaUser]):
return CountrySerializer(self.country).data

general = serializers.SerializerMethodField()
countries = serializers.SerializerMethodField()

@extend_schema_field(GeneralPermissionSerializer)
def get_general(self, obj: Union[User, TigaUser]):
return self.GeneralPermissionSerializer(obj).data

@extend_schema_field(CountryPermissionSerializer(many=True))
def get_countries(self, obj: Union[User, TigaUser]):
if isinstance(obj, User):
try:
obj = obj.userstat
except UserStat.DoesNotExist:
obj = None
if not obj:
return self.CountryPermissionSerializer(many=True).data

result = []
for country in obj.get_countries_with_roles():
result.append(self.CountryPermissionSerializer(instance=obj, country=country).data)
return result

class UserSerializer(serializers.ModelSerializer):
class UserScoreSerializer(serializers.ModelSerializer):
value = serializers.IntegerField(source="score_v2", min_value=0, read_only=True)
Expand Down Expand Up @@ -936,7 +1021,21 @@ def validate(self, data):
data["status"] = ExpertReportAnnotation.STATUS_PUBLIC

data['validation_complete_executive'] = data.pop("is_decisive")

user_role = data['user']
if isinstance(user_role, User):
try:
user_role = user_role.userstat
except UserStat.DoesNotExist:
user_role = None
can_set_is_decisive = False
if user_role:
can_set_is_decisive = user_role.has_role_permission_by_model(
action='mark_as_decisive',
model=ExpertReportAnnotation,
country=data['report'].country
)
if not can_set_is_decisive:
data['validation_complete_executive'] = False
return data

def create(self, validated_data):
Expand Down
48 changes: 46 additions & 2 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from rest_framework.authtoken.models import Token

from django.contrib.auth.models import Group
from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.management import call_command
from django.contrib.auth import get_user_model
Expand All @@ -18,7 +19,7 @@
from api.tests.utils import grant_permission_to_user
from api.tests.clients import AppAPIClient

from tigacrafting.models import IdentificationTask, Taxon
from tigacrafting.models import IdentificationTask, Taxon, UserStat
from tigaserver_app.models import EuropeCountry, Report, Photo

from .factories import create_mobile_user, create_regular_user
Expand Down Expand Up @@ -250,4 +251,47 @@ def taxon_root():
rank=Taxon.TaxonomicRank.CLASS,
name="Insecta",
common_name=""
)
)

@pytest.fixture
def group_expert():
group, _ = Group.objects.get_or_create(name="expert")
return group

@pytest.fixture
def group_superexpert():
group, _ = Group.objects.get_or_create(name="superexpert")
return group

@pytest.fixture
def user_with_role_annotator(user, group_expert):
user.groups.add(group_expert)
return user

@pytest.fixture
def user_with_role_annotator_in_country(user_with_role_annotator, es_country):
user_stat = UserStat.objects.get(user=user_with_role_annotator)
user_stat.native_of = es_country
user_stat.save()

return user_with_role_annotator

@pytest.fixture
def user_with_role_supervisor_in_country(user, group_expert, es_country):
user.groups.add(group_expert)
user_stat = UserStat.objects.get(user=user)
user_stat.national_supervisor_of = es_country
user_stat.save()

return user

@pytest.fixture
def user_with_role_reviewer(user, group_superexpert):
user.groups.add(group_superexpert)
return user

@pytest.fixture
def user_with_role_admin(user):
user.is_superuser = True
user.save()
return user
Loading
Loading