Skip to content

Commit ea4f3c6

Browse files
committed
Add support to fetch from authenticated token lists
1 parent a4428ab commit ea4f3c6

File tree

3 files changed

+221
-2
lines changed

3 files changed

+221
-2
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Generated by Django 5.2.5 on 2025-12-18 12:13
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
('tokens', '0014_tokennotvalid'),
10+
]
11+
12+
operations = [
13+
migrations.AddField(
14+
model_name='tokenlist',
15+
name='auth_key',
16+
field=models.CharField(blank=True, help_text='Header name or query param name (e.g. api_key, token)', max_length=100),
17+
),
18+
migrations.AddField(
19+
model_name='tokenlist',
20+
name='auth_type',
21+
field=models.CharField(choices=[('none', 'None'), ('api_key', 'API Key (Header)'), ('bearer', 'Bearer Token'), ('basic', 'Basic Auth'), ('query', 'Query Param')], default='none', max_length=20),
22+
),
23+
migrations.AddField(
24+
model_name='tokenlist',
25+
name='auth_value',
26+
field=models.TextField(blank=True, help_text='Token / API key / credentials'),
27+
),
28+
]

safe_transaction_service/tokens/models.py

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
from json import JSONDecodeError
44
from typing import Optional, TypedDict
5-
from urllib.parse import urljoin
5+
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse
66

77
from django.conf import settings
88
from django.core.exceptions import ValidationError
@@ -341,12 +341,104 @@ class TokenList(models.Model):
341341
url = models.URLField(unique=True)
342342
description = models.CharField(max_length=200)
343343

344+
auth_type = models.CharField(
345+
max_length=20,
346+
choices=[
347+
("none", "None"),
348+
("api_key", "API Key (Header)"),
349+
("bearer", "Bearer Token"),
350+
("basic", "Basic Auth"),
351+
("query", "Query Param"),
352+
],
353+
default="none",
354+
)
355+
356+
auth_key = models.CharField(
357+
max_length=100,
358+
blank=True,
359+
help_text=(
360+
"Authentication key name. Usage depends on auth_type: "
361+
"For 'api_key': header name (e.g., 'x-api-key'). "
362+
"For 'bearer': header name (e.g., 'Authorization'). "
363+
"For 'query': query parameter name (e.g., 'api_key'). "
364+
"For 'basic': leave empty. "
365+
"For 'none': leave empty."
366+
),
367+
)
368+
369+
auth_value = models.TextField(
370+
blank=True,
371+
help_text=(
372+
"Authentication credentials. Usage depends on auth_type: "
373+
"For 'api_key': the API key value. "
374+
"For 'bearer': the bearer token value (without 'Bearer ' prefix). "
375+
"For 'basic': the full 'Basic <base64>' value (e.g., 'Basic dXNlcjpwYXNz'). "
376+
"For 'query': the query parameter value. "
377+
"For 'none': leave empty."
378+
),
379+
)
380+
344381
def __str__(self):
345382
return f"{self.description} token list"
346383

384+
def clean(self):
385+
super().clean()
386+
387+
if self.auth_type == "none":
388+
if self.auth_key or self.auth_value:
389+
raise ValidationError(
390+
"auth_key and auth_value must be empty when auth_type is 'none'"
391+
)
392+
393+
elif self.auth_type in {"bearer", "api_key", "query"}:
394+
if not self.auth_key:
395+
raise ValidationError(
396+
{"auth_key": "auth_key is required for this auth_type"}
397+
)
398+
if not self.auth_value:
399+
raise ValidationError(
400+
{"auth_value": "auth_value is required for this auth_type"}
401+
)
402+
403+
elif self.auth_type == "basic":
404+
if not self.auth_value:
405+
raise ValidationError(
406+
{"auth_value": "auth_value is required for basic auth"}
407+
)
408+
409+
def _build_authenticated_request(self) -> tuple[str, dict]:
410+
"""
411+
Builds the URL and headers with authentication based on the configured auth_type.
412+
Supports bearer tokens, API keys (header-based), basic auth, and query parameters.
413+
414+
:return: Tuple of (authenticated_url, headers_dict)
415+
"""
416+
url = self.url
417+
headers: dict[str, str] = {}
418+
419+
if self.auth_type == "bearer":
420+
header_name = self.auth_key if self.auth_key else "Authorization"
421+
headers[header_name] = f"Bearer {self.auth_value}"
422+
423+
elif self.auth_type == "api_key":
424+
headers[self.auth_key] = self.auth_value
425+
426+
elif self.auth_type == "basic":
427+
headers["Authorization"] = self.auth_value
428+
429+
elif self.auth_type == "query":
430+
parsed = urlparse(url)
431+
query = dict(parse_qsl(parsed.query))
432+
query[self.auth_key] = self.auth_value
433+
434+
url = urlunparse(parsed._replace(query=urlencode(query)))
435+
436+
return url, headers
437+
347438
def get_tokens(self) -> list[TokenListToken]:
348439
try:
349-
response = requests.get(self.url, timeout=5)
440+
url, headers = self._build_authenticated_request()
441+
response = requests.get(url, headers=headers, timeout=5)
350442
if response.ok:
351443
tokens = response.json().get("tokens", [])
352444
if not tokens:

safe_transaction_service/tokens/tests/test_models.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,102 @@ def test_get_tokens(self, requests_get: MagicMock):
259259

260260
requests_get.return_value.json.return_value = token_list_mock
261261
self.assertEqual(len(token_list.get_tokens()), 35)
262+
263+
@mock.patch("requests.get")
264+
def test_get_tokens_with_auth_none(self, requests_get: MagicMock):
265+
token_list = TokenListFactory(
266+
url="https://example.com/tokens.json",
267+
auth_type="none",
268+
)
269+
270+
requests_get.return_value.ok = True
271+
requests_get.return_value.json.return_value = token_list_mock
272+
273+
tokens = token_list.get_tokens()
274+
275+
self.assertEqual(len(tokens), 35)
276+
requests_get.assert_called_once_with(
277+
"https://example.com/tokens.json", headers={}, timeout=5
278+
)
279+
280+
@mock.patch("requests.get")
281+
def test_get_tokens_with_auth_bearer(self, requests_get: MagicMock):
282+
token_list = TokenListFactory(
283+
url="https://example.com/tokens.json",
284+
auth_type="bearer",
285+
auth_value="my-secret-token",
286+
)
287+
288+
requests_get.return_value.ok = True
289+
requests_get.return_value.json.return_value = token_list_mock
290+
291+
tokens = token_list.get_tokens()
292+
293+
self.assertEqual(len(tokens), 35)
294+
requests_get.assert_called_once_with(
295+
"https://example.com/tokens.json",
296+
headers={"Authorization": "Bearer my-secret-token"},
297+
timeout=5,
298+
)
299+
300+
@mock.patch("requests.get")
301+
def test_get_tokens_with_auth_api_key(self, requests_get: MagicMock):
302+
token_list = TokenListFactory(
303+
url="https://example.com/tokens.json",
304+
auth_type="api_key",
305+
auth_key="X-API-Key",
306+
auth_value="my-api-key-value",
307+
)
308+
309+
requests_get.return_value.ok = True
310+
requests_get.return_value.json.return_value = token_list_mock
311+
312+
tokens = token_list.get_tokens()
313+
314+
self.assertEqual(len(tokens), 35)
315+
requests_get.assert_called_once_with(
316+
"https://example.com/tokens.json",
317+
headers={"X-API-Key": "my-api-key-value"},
318+
timeout=5,
319+
)
320+
321+
@mock.patch("requests.get")
322+
def test_get_tokens_with_auth_basic(self, requests_get: MagicMock):
323+
token_list = TokenListFactory(
324+
url="https://example.com/tokens.json",
325+
auth_type="basic",
326+
auth_value="Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
327+
)
328+
329+
requests_get.return_value.ok = True
330+
requests_get.return_value.json.return_value = token_list_mock
331+
332+
tokens = token_list.get_tokens()
333+
334+
self.assertEqual(len(tokens), 35)
335+
requests_get.assert_called_once_with(
336+
"https://example.com/tokens.json",
337+
headers={"Authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ="},
338+
timeout=5,
339+
)
340+
341+
@mock.patch("requests.get")
342+
def test_get_tokens_with_auth_query(self, requests_get: MagicMock):
343+
token_list = TokenListFactory(
344+
url="https://example.com/tokens.json",
345+
auth_type="query",
346+
auth_key="api_key",
347+
auth_value="my-query-param-key",
348+
)
349+
350+
requests_get.return_value.ok = True
351+
requests_get.return_value.json.return_value = token_list_mock
352+
353+
tokens = token_list.get_tokens()
354+
355+
self.assertEqual(len(tokens), 35)
356+
requests_get.assert_called_once_with(
357+
"https://example.com/tokens.json?api_key=my-query-param-key",
358+
headers={},
359+
timeout=5,
360+
)

0 commit comments

Comments
 (0)