-
Notifications
You must be signed in to change notification settings - Fork 28
[feat] Add endpoint that checks whether an owner has Gen AI consent #1102
Changes from 16 commits
a82089d
7e5a1a9
e47e5a3
d4ba9a6
9f365bd
5bdbff6
28457dd
a0a5ad8
d52a405
4c5ef9d
d6192d5
4e4f279
ed71817
386372f
c0b07ff
da13f82
624ee10
ac5908b
34c1039
9ebf19b
a351940
3f70d80
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from rest_framework import serializers | ||
|
||
|
||
class GenAIAuthSerializer(serializers.Serializer): | ||
is_valid = serializers.BooleanField() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
import hmac | ||
import json | ||
from hashlib import sha256 | ||
from unittest.mock import patch | ||
|
||
from django.urls import reverse | ||
from rest_framework import status | ||
from rest_framework.test import APITestCase | ||
from shared.django_apps.core.tests.factories import OwnerFactory | ||
|
||
from codecov_auth.models import GithubAppInstallation | ||
|
||
PAYLOAD_SECRET = b"testixik8qdauiab1yiffydimvi72ekq" | ||
VIEW_URL = reverse("auth") | ||
|
||
|
||
def sign_payload(payload, secret=PAYLOAD_SECRET): | ||
data = json.dumps(payload, separators=(",", ":")).encode("utf-8") | ||
signature = "sha256=" + hmac.new(secret, data, digestmod=sha256).hexdigest() | ||
return signature, data | ||
|
||
|
||
class GenAIAuthViewTests(APITestCase): | ||
@patch("utils.config.get_config", return_value=PAYLOAD_SECRET) | ||
def test_missing_parameters(self, mock_config): | ||
payload = {} | ||
sig, data = sign_payload(payload) | ||
response = self.client.post( | ||
VIEW_URL, | ||
data=payload, | ||
content_type="application/json", | ||
HTTP_HTTP_X_GEN_AI_AUTH_SIGNATURE=sig, | ||
) | ||
self.assertEqual(response.status_code, 400) | ||
self.assertIn("Missing required parameters", response.data) | ||
|
||
@patch("utils.config.get_config", return_value=PAYLOAD_SECRET) | ||
def test_invalid_signature(self, mock_config): | ||
payload = {"external_owner_id": "owner1", "repo_service_id": "101"} | ||
# Create a wrong signature by altering the payload before signing | ||
wrong_sig = ( | ||
"sha256=" + hmac.new(PAYLOAD_SECRET, b"{}", digestmod=sha256).hexdigest() | ||
) | ||
response = self.client.post( | ||
VIEW_URL, | ||
data=payload, | ||
content_type="application/json", | ||
HTTP_HTTP_X_GEN_AI_AUTH_SIGNATURE=wrong_sig, | ||
) | ||
self.assertEqual(response.status_code, 403) | ||
|
||
@patch("utils.config.get_config", return_value=PAYLOAD_SECRET) | ||
def test_owner_not_found(self, mock_config): | ||
payload = {"external_owner_id": "nonexistent_owner", "repo_service_id": "101"} | ||
sig, serialized_data = sign_payload(payload) | ||
response = self.client.post( | ||
VIEW_URL, | ||
HTTP_HTTP_X_GEN_AI_AUTH_SIGNATURE=sig, | ||
data=serialized_data, | ||
content_type="application/json", | ||
) | ||
self.assertEqual(response.status_code, 404) | ||
|
||
@patch("utils.config.get_config", return_value=PAYLOAD_SECRET) | ||
def test_no_installation(self, mock_config): | ||
_ = OwnerFactory(service="github", service_id="owner1", username="test1") | ||
payload = {"external_owner_id": "owner1", "repo_service_id": "101"} | ||
sig, data = sign_payload(payload) | ||
response = self.client.post( | ||
VIEW_URL, | ||
data=data, | ||
content_type="application/json", | ||
HTTP_HTTP_X_GEN_AI_AUTH_SIGNATURE=sig, | ||
) | ||
|
||
self.assertEqual(response.status_code, 200) | ||
self.assertEqual(response.data, {"is_valid": False}) | ||
|
||
@patch("utils.config.get_config", return_value=PAYLOAD_SECRET) | ||
def test_authorized(self, mock_config): | ||
owner = OwnerFactory(service="github", service_id="owner2", username="test2") | ||
app_install = GithubAppInstallation( | ||
installation_id=12345, | ||
owner=owner, | ||
name="ai-features", | ||
repository_service_ids=["101", "202"], | ||
) | ||
app_install.save() | ||
payload = {"external_owner_id": "owner2", "repo_service_id": "101"} | ||
sig, data = sign_payload(payload) | ||
response = self.client.post( | ||
VIEW_URL, | ||
data=data, | ||
content_type="application/json", | ||
HTTP_HTTP_X_GEN_AI_AUTH_SIGNATURE=sig, | ||
) | ||
self.assertEqual(response.status_code, status.HTTP_200_OK) | ||
self.assertEqual(response.data, {"is_valid": True}) | ||
|
||
@patch("utils.config.get_config", return_value=PAYLOAD_SECRET) | ||
def test_unauthorized(self, mock_config): | ||
owner = OwnerFactory(service="github", service_id="owner3", username="test3") | ||
# Create a GithubAppInstallation where the list does not include the requested repo_service_id. | ||
app_install = GithubAppInstallation.objects.create( | ||
installation_id=2, | ||
owner=owner, | ||
name="ai-features", | ||
repository_service_ids=["303", "404"], | ||
) | ||
app_install.save() | ||
payload = {"external_owner_id": "owner3", "repo_service_id": "101"} | ||
sig, data = sign_payload(payload) | ||
response = self.client.post( | ||
VIEW_URL, | ||
data=data, | ||
content_type="application/json", | ||
HTTP_HTTP_X_GEN_AI_AUTH_SIGNATURE=sig, | ||
) | ||
self.assertEqual(response.status_code, 200) | ||
self.assertEqual(response.data, {"is_valid": False}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from django.urls import path | ||
|
||
from .views import GenAIAuthView | ||
|
||
urlpatterns = [ | ||
path("auth/", GenAIAuthView.as_view(), name="auth"), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import hmac | ||
import logging | ||
from hashlib import sha256 | ||
|
||
from django.utils.crypto import constant_time_compare | ||
from rest_framework.exceptions import NotFound, PermissionDenied | ||
from rest_framework.permissions import AllowAny | ||
from rest_framework.response import Response | ||
from rest_framework.views import APIView | ||
|
||
from api.gen_ai.serializers import GenAIAuthSerializer | ||
from codecov_auth.models import GithubAppInstallation, Owner | ||
from graphql_api.types.owner.owner import AI_FEATURES_GH_APP_ID | ||
from utils.config import get_config | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class GenAIAuthView(APIView): | ||
permission_classes = [AllowAny] | ||
serializer_class = GenAIAuthSerializer | ||
|
||
def validate_signature(self, request): | ||
key = get_config( | ||
"gen_ai", "auth_secret", default=b"testixik8qdauiab1yiffydimvi72ekq" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to avoid setting a default here and instead error out if it is not set? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our general pattern in this codebase is to do this for testing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about self-hosted users? Will all of those installations end up with this same secret? |
||
) | ||
if isinstance(key, str): | ||
key = key.encode("utf-8") | ||
expected_sig = request.headers.get("HTTP-X-GEN-AI-AUTH-SIGNATURE") | ||
print(request.headers) | ||
rohitvinnakota-codecov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
computed_sig = ( | ||
"sha256=" + hmac.new(key, request.body, digestmod=sha256).hexdigest() | ||
) | ||
if not (expected_sig and constant_time_compare(computed_sig, expected_sig)): | ||
rohitvinnakota-codecov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise PermissionDenied("Invalid signature") | ||
|
||
def post(self, request, *args, **kwargs): | ||
self.validate_signature(request) | ||
external_owner_id = request.data.get("external_owner_id") | ||
repo_service_id = request.data.get("repo_service_id") | ||
if not external_owner_id or not repo_service_id: | ||
return Response("Missing required parameters", status=400) | ||
try: | ||
owner = Owner.objects.get(service_id=external_owner_id) | ||
except Owner.DoesNotExist: | ||
raise NotFound("Owner not found") | ||
|
||
is_authorized = True | ||
|
||
app_install = GithubAppInstallation.objects.filter( | ||
owner_id=owner.ownerid, app_id=AI_FEATURES_GH_APP_ID | ||
).first() | ||
|
||
if not app_install: | ||
is_authorized = False | ||
|
||
else: | ||
repo_ids = app_install.repository_service_ids | ||
if repo_ids and repo_service_id not in repo_ids: | ||
is_authorized = False | ||
|
||
return Response({"is_valid": is_authorized}) |
Uh oh!
There was an error while loading. Please reload this page.