Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions ansible_base/oauth2_provider/views/token.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import hashlib
from datetime import timedelta

from django.conf import settings
from django.db.models import Q
from django.utils.timezone import now
from oauth2_provider import views as oauth_views
from oauthlib import oauth2
from rest_framework.permissions import SAFE_METHODS
from rest_framework.viewsets import ModelViewSet

from ansible_base.lib.utils.hashing import hash_string
Expand Down Expand Up @@ -75,3 +78,19 @@ class OAuth2TokenViewSet(ModelViewSet, AnsibleBaseDjangoAppApiView):
queryset = OAuth2AccessToken.objects.all()
serializer_class = OAuth2TokenSerializer
permission_classes = [OAuth2ScopePermission, OAuth2TokenPermission]

def get_queryset(self):
qs = super().get_queryset()
user = self.request.user
if user.is_superuser:
return qs
if self.request.method in SAFE_METHODS and getattr(user, 'is_platform_auditor', False):
return qs
q = Q(user=user)
if 'ansible_base.rbac' in settings.INSTALLED_APPS:
from ansible_base.lib.utils.auth import get_organization_model

org_cls = get_organization_model()
admin_org_ids = org_cls.access_ids_qs(user, 'change')
q |= Q(application__organization_id__in=admin_org_ids)
return qs.filter(q)
123 changes: 111 additions & 12 deletions test_app/tests/oauth2_provider/test_rbac.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
conditions are not met).
"""

from datetime import datetime, timezone

import pytest
from oauthlib.common import generate_token

from ansible_base.lib.utils.response import get_relative_url
from ansible_base.oauth2_provider.models import OAuth2AccessToken
Expand Down Expand Up @@ -191,9 +194,18 @@ def test_oauth2_application_token_read_change_delete(
app.user = random_user
app.save()

expected_read_status = 200 if has_access else 403
expected_change_status = 200 if has_access else 403
expected_delete_status = 204 if has_access else 403
if has_access:
expected_read_status = 200
expected_change_status = 200
expected_delete_status = 204
elif user_case == 'anon':
expected_read_status = 401
expected_change_status = 401
expected_delete_status = 401
else:
expected_read_status = 404
expected_change_status = 404
expected_delete_status = 404

# Determine the user based on the test case (adding them to organizations, etc. as necessary).
if user_case == 'superuser':
Expand All @@ -215,9 +227,6 @@ def test_oauth2_application_token_read_change_delete(
pass
elif user_case == 'anon':
user = None
expected_read_status = 401
expected_change_status = 401
expected_delete_status = 401
else:
raise ValueError(f"Invalid user_case: {user_case}")

Expand Down Expand Up @@ -346,6 +355,90 @@ def test_oauth2_pat_create(request, org_member_rd, org_admin_rd, user, random_us
assert token.user == user # not random_user


@pytest.mark.parametrize(
'user_case, sees_all_tokens',
[
('superuser', True),
('org_admin', False),
('token_owner', False),
('other_user', False),
],
)
@pytest.mark.django_db
def test_oauth2_token_list_filtered_by_user(
request,
user_case,
sees_all_tokens,
admin_user,
user,
random_user,
unauthenticated_api_client,
oauth2_user_pat,
oauth2_application,
organization,
org_admin_rd,
):
"""
Non-superusers should only see their own tokens when listing.
Superusers should see all tokens.
Org admins should see tokens for applications in their org.
(AAP-72278)
"""
other_token = OAuth2AccessToken.objects.create(
user=random_user,
description="Token for random_user",
expires=datetime(2088, 1, 1, tzinfo=timezone.utc),
token=generate_token(),
)

app = oauth2_application[0]
app.organization = organization
app.save()
org_app_token = OAuth2AccessToken.objects.create(
user=random_user,
application=app,
description="App token in org",
expires=datetime(2088, 1, 1, tzinfo=timezone.utc),
token=generate_token(),
)

if user_case == 'superuser':
acting_user = admin_user
elif user_case == 'org_admin':
acting_user = request.getfixturevalue('random_user_1')
RoleDefinition.objects.managed.org_admin.give_permission(acting_user, organization)
elif user_case == 'token_owner':
acting_user = user
elif user_case == 'other_user':
acting_user = random_user
else:
raise ValueError(f"Invalid user_case: {user_case}")

unauthenticated_api_client.force_login(acting_user)
url = get_relative_url("token-list")
response = unauthenticated_api_client.get(url)
assert response.status_code == 200

token_ids = {t['id'] for t in response.data['results']}

if sees_all_tokens:
assert oauth2_user_pat.id in token_ids
assert other_token.id in token_ids
assert org_app_token.id in token_ids
elif user_case == 'org_admin':
assert org_app_token.id in token_ids, "Org admin should see app tokens in their org"
assert oauth2_user_pat.id not in token_ids, "Org admin should not see unrelated PATs"
assert other_token.id not in token_ids, "Org admin should not see unrelated PATs"
elif user_case == 'token_owner':
assert oauth2_user_pat.id in token_ids
assert other_token.id not in token_ids
assert org_app_token.id not in token_ids
elif user_case == 'other_user':
assert other_token.id in token_ids
assert org_app_token.id in token_ids, "Token owner should see their own app tokens"
assert oauth2_user_pat.id not in token_ids


@pytest.mark.parametrize(
'user_case, has_access',
[
Expand All @@ -364,9 +457,18 @@ def test_oauth2_pat_read_change_delete(request, user_case, has_access, org_membe
- I am the user of the token
- I am the superuser
"""
expected_read_status = 200 if has_access else 403
expected_change_status = 200 if has_access else 403
expected_delete_status = 204 if has_access else 403
if has_access:
expected_read_status = 200
expected_change_status = 200
expected_delete_status = 204
elif user_case == 'anon':
expected_read_status = 401
expected_change_status = 401
expected_delete_status = 401
else:
expected_read_status = 404
expected_change_status = 404
expected_delete_status = 404

# Determine the user based on the test case (adding them to organizations, etc. as necessary).
if user_case == 'superuser':
Expand All @@ -380,9 +482,6 @@ def test_oauth2_pat_read_change_delete(request, user_case, has_access, org_membe
user = request.getfixturevalue('random_user')
elif user_case == 'anon':
user = None
expected_read_status = 401
expected_change_status = 401
expected_delete_status = 401
else:
raise ValueError(f"Invalid user_case: {user_case}")

Expand Down
Loading