Skip to content
8 changes: 8 additions & 0 deletions datatracker/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copyright The IETF Trust 2025, All Rights Reserved

from rest_framework import serializers


class MergeAuthorSerializer(serializers.Serializer):
old_person_id = serializers.SerializerMethodField()
new_person_id = serializers.SerializerMethodField()
5 changes: 5 additions & 0 deletions purple/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@
"COMPONENT_NO_READ_ONLY_REQUIRED": True,
}

# DRF API tokens
APP_API_TOKENS = {
"purple.api.merge_person": ["not-a-secret"],
}


# Internationalization
# https://docs.djangoproject.com/en/4.2/topics/i18n/
Expand Down
5 changes: 5 additions & 0 deletions purple/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ def to_url(self, value):
path("api/rpc/submissions/<int:document_id>/import/", rpc_api.import_submission),
path("api/rpc/version/", rpc_api.version),
path("api/rpc/", include(router.urls)),
path(
"api/merge_person/",
rpc_api.MergePersonView.as_view(),
name="Merge datatracker person",
),
]
30 changes: 25 additions & 5 deletions rpc/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@
from django.http import JsonResponse
from django_filters import rest_framework as filters
from drf_spectacular.types import OpenApiTypes

from rest_framework.decorators import (
action,
api_view,
permission_classes,
)
from rest_framework.response import Response
from rest_framework.exceptions import PermissionDenied
from rest_framework import serializers
from rest_framework import mixins, views, viewsets
from rest_framework import mixins, views, viewsets, serializers, status
from drf_spectacular.utils import extend_schema, inline_serializer

import rpcapi_client
from datatracker.rpcapi import with_rpcapi

from datatracker.models import Document
from datatracker.models import Document, DatatrackerPerson
from utils.api import requires_api_token
from utils.authentication import ApiKeyAuthentication
from utils.permissions import HasApiKey
from .models import (
Assignment,
Capability,
Expand All @@ -39,6 +40,7 @@
ClusterSerializer,
CreateRfcToBeSerializer,
LabelSerializer,
MergePersonSerializer,
NestedAssignmentSerializer,
QueueItemSerializer,
RfcToBeSerializer,
Expand Down Expand Up @@ -388,3 +390,21 @@ class StreamNameViewSet(viewsets.ReadOnlyModelViewSet):
class TlpBoilerplateChoiceNameViewSet(viewsets.ReadOnlyModelViewSet):
queryset = TlpBoilerplateChoiceName.objects.all()
serializer_class = TlpBoilerplateChoiceNameSerializer


class MergePersonView(views.APIView):
authentication_classes = [ApiKeyAuthentication]
permission_classes = [HasApiKey]
api_key_endpoint = "purple.api.merge_person"

@requires_api_token("purple.api.merge_person")
def post(self, request):
serializer = MergePersonSerializer(data=request.data)
if serializer.is_valid():
old_person_id = serializer.validated_data["old_person_id"]
new_person_id = serializer.validated_data["new_person_id"]
DatatrackerPerson.objects.filter(datatracker_id=old_person_id).update(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this step should update any references to DatatrackerPerson with old_person_id to DatatrackerPerson with new_person_id.
May be delete the old record afterwards?
(@jennifer-richards)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed separately, I am hoping we can avoid having to rewrite a lot of records. If we can let things work with multiple DatatrackerPerson instances holding the same datatracker_id (i.e., removing the current unique constraint plus some additional work), a merge may not need to be much more complicated than what you suggest here.

datatracker_id=new_person_id
)
return Response({"success": True}, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
5 changes: 5 additions & 0 deletions rpc/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,11 @@ class SubmissionListItemSerializer(serializers.Serializer):
submitted = serializers.DateTimeField()


class MergePersonSerializer(serializers.Serializer):
old_person_id = serializers.IntegerField()
new_person_id = serializers.IntegerField()


def check_user_has_role(user, role) -> bool:
rpc_person = RpcPerson.objects.filter(
datatracker_person=user.datatracker_person()
Expand Down
78 changes: 78 additions & 0 deletions rpc/tests_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright The IETF Trust 2025, All Rights Reserved

import json

from django.test import Client, TestCase
from django.test.utils import override_settings


class ApiTests(TestCase):
@override_settings(
APP_API_TOKENS={
"purple.api.merge_person": ["valid-token"],
}
)
def test_merge_person(self):
url = "/api/merge_person/"
client = Client()
data = {"old_person_id": 12, "new_person_id": 13}
headers_valid = {"X_API_KEY": "valid-token"}
headers_invalid = {"X_API_KEY": "INVALID-TOKEN"}

# POST: without an API token
response = client.post(
url, data=json.dumps(data), content_type="application/json"
)
self.assertEqual(response.status_code, 403)

# POST: invalid token
response = client.post(
url,
data=json.dumps(data),
content_type="application/json",
headers=headers_invalid,
)
self.assertEqual(response.status_code, 403)

# POST: valid token with no data
response = client.post(url, headers=headers_valid)

# POST: valid token with incomplete data
incomplete_data = {"old_person_id": 12}
response = client.post(
url,
data=json.dumps(incomplete_data),
content_type="application/json",
headers=headers_valid,
)
self.assertEqual(response.status_code, 400)
self.assertIn("This field is required.", response.json()["new_person_id"])

# POST: valid token with incomplete data
incomplete_data = {"new_person_id": 12}
response = client.post(
url,
data=json.dumps(incomplete_data),
content_type="application/json",
headers=headers_valid,
)
self.assertEqual(response.status_code, 400)
self.assertIn("This field is required.", response.json()["old_person_id"])

## POST valid request and token
response = client.post(
url,
data=json.dumps(data),
content_type="application/json",
headers=headers_valid,
)
self.assertEqual(response.status_code, 200)
self.assertTrue(response.json()["success"])

# GET: valid token
response = client.get(url, headers={"X_API_KEY": "valid-token"})
self.assertEqual(response.status_code, 405)

# GET: invalid token
response = client.get(url, headers={"X_API_KEY": "invalid-token"})
self.assertEqual(response.status_code, 403)
78 changes: 78 additions & 0 deletions utils/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright The IETF Trust 2025, All Rights Reserved

from functools import wraps
from typing import Callable, Optional, Union

from django.conf import settings
from django.http import HttpResponseForbidden
from rest_framework.request import Request
from rest_framework.views import APIView


def is_valid_token(endpoint, token):
# This is where we would consider integration with vault
# Settings implementation for now.
if hasattr(settings, "APP_API_TOKENS"):
token_store = settings.APP_API_TOKENS
if endpoint in token_store:
endpoint_tokens = token_store[endpoint]
# Be sure endpoints is a list or tuple so we don't accidentally use substring matching!
if not isinstance(endpoint_tokens, (list, tuple)):
endpoint_tokens = [endpoint_tokens]
if token in endpoint_tokens:
return True
return False


def requires_api_token(func_or_endpoint: Optional[Union[Callable, str]] = None):
"""Validate API token before executing the wrapped method

Usage:
* Basic: endpoint defaults to the qualified name of the wrapped method. E.g., in ietf.api.views,

@requires_api_token
def my_view(request):
...

will require a token for "ietf.api.views.my_view"

* Custom endpoint: specify the endpoint explicitly

@requires_api_token("ietf.api.views.some_other_thing")
def my_view(request):
...

will require a token for "ietf.api.views.some_other_thing"
"""

def decorate(f):
if _endpoint is None:
fname = getattr(f, "__qualname__", None)
if fname is None:
raise TypeError(
"Cannot automatically decorate function that does not support __qualname__. "
"Explicitly set the endpoint."
)
endpoint = "{}.{}".format(f.__module__, fname)
else:
endpoint = _endpoint

@wraps(f)
def wrapped(request, *args, **kwargs):
if not isinstance(request, Request) and isinstance(request, APIView):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, though I think isinstance(request, django.views.generic.View) is the right class to use. (APIView is a subclass and gets its request-handling properties through it.)

Maybe you could go full pythonic and just do ... and hasattr(request, "request")? 🦆

request = request.request
authtoken = request.META.get("HTTP_X_API_KEY", None)
if authtoken is None or not is_valid_token(endpoint, authtoken):
return HttpResponseForbidden()
return f(request, *args, **kwargs)

return wrapped

# Magic to allow decorator to be used with or without parentheses
if callable(func_or_endpoint):
func = func_or_endpoint
_endpoint = None
return decorate(func)
else:
_endpoint = func_or_endpoint
return decorate
19 changes: 19 additions & 0 deletions utils/authentication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright The IETF Trust 2025, All Rights Reserved

from rest_framework import authentication
from django.contrib.auth.models import AnonymousUser


class ApiKeyAuthentication(authentication.BaseAuthentication):
"""API-Key header authentication"""

def authenticate(self, request):
"""Extract the authentication token, if present

This does not validate the token, it just arranges for it to be available in request.auth.
It's up to a Permissions class to validate it for the appropriate endpoint.
"""
token = request.META.get("HTTP_X_API_KEY", None)
if token is None:
return None
return AnonymousUser(), token # available as request.user and request.auth
20 changes: 20 additions & 0 deletions utils/permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright The IETF Trust 2025, All Rights Reserved

from rest_framework import permissions

from utils.api import is_valid_token


class HasApiKey(permissions.BasePermission):
"""Permissions class that validates a token using is_valid_token

The view class must indicate the relevant endpoint by setting `api_key_endpoint`.
Must be used with an Authentication class that puts a token in request.auth.
"""

def has_permission(self, request, view):
endpoint = getattr(view, "api_key_endpoint", None)
auth_token = getattr(request, "auth", None)
if endpoint is not None and auth_token is not None:
return is_valid_token(endpoint, auth_token)
return False
85 changes: 85 additions & 0 deletions utils/tests_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright The IETF Trust 2025, All Rights Reserved

from django.test import TestCase, RequestFactory
from django.test.utils import override_settings

from utils.api import is_valid_token, requires_api_token


class ApiTests(TestCase):
@override_settings(
APP_API_TOKENS={
"purple.api.foobar": ["valid-token"],
"purple.api.misconfigured": "valid-token", # misconfigured
}
)
def test_is_valid_token(self):
self.assertFalse(is_valid_token("purple.fake.endpoint", "valid-token"))
self.assertFalse(is_valid_token("purple.api.foobar", "invalid-token"))
self.assertFalse(is_valid_token("purple.api.foobar", None))
self.assertTrue(is_valid_token("purple.api.foobar", "valid-token"))

# misconfiguration
self.assertFalse(is_valid_token("purple.api.misconfigured", "v"))
self.assertFalse(is_valid_token("purple.api.misconfigured", None))
self.assertTrue(is_valid_token("purple.api.misconfigured", "valid-token"))

@override_settings(
APP_API_TOKENS={
"purple.api.foo": ["valid-token"],
"purple.api.bar": ["another-token"],
"purple.api.misconfigured": "valid-token", # misconfigured
}
)
def test_requires_api_token(self):
@requires_api_token("purple.api.foo")
def protected_function(request):
return f"Access granted: {request.method}"

# request with a valid token
request = RequestFactory().get(
"/some/url", headers={"X_API_KEY": "valid-token"}
)
result = protected_function(request)
self.assertEqual(result, "Access granted: GET")

# request with an invalid token
request = RequestFactory().get(
"/some/url", headers={"X_API_KEY": "invalid-token"}
)
result = protected_function(request)
self.assertEqual(result.status_code, 403)

# request without a token
request = RequestFactory().get("/some/url", headers={"X_API_KEY": ""})
result = protected_function(request)
self.assertEqual(result.status_code, 403)

# request without X_API_KEY header
request = RequestFactory().get("/some/url")
result = protected_function(request)
self.assertEqual(result.status_code, 403)

# request with a valid token for another API endpoint
request = RequestFactory().get(
"/some/url", headers={"X_API_KEY": "another-token"}
)
result = protected_function(request)
self.assertEqual(result.status_code, 403)

# requests for a misconfigured endpoint
@requires_api_token("purple.api.misconfigured")
def another_protected_function(request):
return f"Access granted: {request.method}"

# request with valid token
request = RequestFactory().get(
"/some/url", headers={"X_API_KEY": "valid-token"}
)
result = another_protected_function(request)
self.assertEqual(result, "Access granted: GET")

# request with invalid token with the correct initial character
request = RequestFactory().get("/some/url", headers={"X_API_KEY": "v"})
result = another_protected_function(request)
self.assertEqual(result.status_code, 403)