-
Notifications
You must be signed in to change notification settings - Fork 199
Expand file tree
/
Copy pathtest_auth.py
More file actions
128 lines (90 loc) · 3.03 KB
/
test_auth.py
File metadata and controls
128 lines (90 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import base64
import ipfshttpclient
import json
import pytest
from ipaddress import ip_address, IPv4Address, IPv6Address
from ipfshttpclient.exceptions import StatusError
from multiaddr import Multiaddr
from _pytest.fixtures import FixtureRequest
from pytest_localserver.http import ContentServer
from urllib.parse import urlparse
from werkzeug import Request, Response
BASIC_USERNAME = 'basic_username'
BASIC_PASSWORD = 'basic_password'
def _basic_auth_token(username: str, password: str) -> str:
return base64.b64encode(f'{username}:{password}'.encode('utf-8')).decode('utf-8')
def _url_to_multiaddr(url: str) -> Multiaddr:
parsed = urlparse(url)
try:
ip = ip_address(parsed.hostname)
except ValueError:
ip = None
if ip is None:
prefix = '/dns'
elif isinstance(ip, IPv4Address):
prefix = '/ip4'
elif isinstance(ip, IPv6Address):
prefix = '/ip6'
else:
raise TypeError(f"Don't know how to convert {ip} to a {Multiaddr.__name__}")
return Multiaddr(f'{prefix}/{parsed.hostname}/tcp/{parsed.port}/{parsed.scheme}')
class AuthenticatingServer(ContentServer):
@staticmethod
def _is_authorized(expected_credentials: str, request: Request) -> bool:
authorizations = request.headers.get_all('Authorization')
if authorizations and len(authorizations) == 1:
authorization = authorizations[0]
return authorization == f'Basic {expected_credentials}'
else:
return False
def __call__(self, environ, start_response) -> Response:
request = Request(environ)
self.requests.append(request)
expected_credentials = _basic_auth_token(BASIC_USERNAME, BASIC_PASSWORD)
if self._is_authorized(expected_credentials, request):
response = Response(status=self.code)
response.headers.clear()
response.headers.extend(self.headers)
response.data = self.content
else:
response = Response(status=401)
response.headers.clear()
response.data = 'Unauthorized'
return response(environ, start_response)
@pytest.fixture(scope='module')
def authenticating_server(request: FixtureRequest) -> ContentServer:
server = AuthenticatingServer()
server.start()
request.addfinalizer(server.stop)
return server
def test_basic_auth_failure(authenticating_server: ContentServer) -> None:
headers = {
'Content-Type': 'text/json'
}
version = '0.0.1'
response = {
'Version': version
}
authenticating_server.serve_content(json.dumps(response), headers=headers)
address = _url_to_multiaddr(authenticating_server.url)
with pytest.raises(StatusError) as failure:
ipfshttpclient.connect(
addr=address,
auth=('wrong', 'wrong')
)
assert failure.value.status_code == 401
def test_basic_auth_success(authenticating_server: ContentServer) -> None:
headers = {
'Content-Type': 'text/json'
}
version = '0.0.1'
response = {
'Version': version
}
authenticating_server.serve_content(json.dumps(response), headers=headers)
address = _url_to_multiaddr(authenticating_server.url)
with ipfshttpclient.connect(
addr=address,
auth=(BASIC_USERNAME, BASIC_PASSWORD)
) as client:
assert client.version()['Version'] == version