Skip to content

Commit b35f8ca

Browse files
committed
Adds vitrea auth that can be used with requests Refs #12
1 parent e989dbe commit b35f8ca

3 files changed

Lines changed: 281 additions & 183 deletions

File tree

dicomtrolley/auth.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Authentication mechanisms for DICOM servers"""
2+
3+
from requests.auth import AuthBase
4+
from requests.models import Request
5+
6+
from dicomtrolley.exceptions import DICOMTrolleyError
7+
8+
9+
class VitreaAuth(AuthBase):
10+
"""Can log in to a server running Vitrea Connection 8.2.0.1
11+
12+
Raises
13+
------
14+
DICOMTrolleyAuthError
15+
If logging in fails
16+
17+
Notes
18+
-----
19+
Vitrea login returns an auth token, but for some reason this is not checked
20+
at all and instead all validation is done based on session token.
21+
"""
22+
23+
def __init__(self, login_url, user, password, realm):
24+
self.login_url = login_url
25+
self.user = user
26+
self.password = password
27+
self.realm = realm
28+
29+
def response_hook(self, r, **kwargs):
30+
"""Called before returning response. Try to log if not authenticated"""
31+
if r.status_code == 401:
32+
"""Not logged in, try to log in and retry request"""
33+
# first log in. This should automatically save the session ID
34+
login_response = self.do_login_call(r.connection)
35+
request = r.request.copy()
36+
request.prepare_cookies(
37+
login_response.cookies
38+
) # transfer manually here
39+
retry_response = r.connection.send(request, **kwargs)
40+
41+
# history makes cookies persist in session
42+
retry_response.history.append(login_response)
43+
return retry_response
44+
else:
45+
return r
46+
47+
def do_login_call(self, connection):
48+
"""Log in to vitrea connection url
49+
50+
Raises
51+
------
52+
DICOMTrolleyAuthError
53+
If logging in fails
54+
"""
55+
req = Request(
56+
method="POST",
57+
url=self.login_url,
58+
headers={
59+
"X-Userid": self.user,
60+
"X-Password": self.password,
61+
"X-Realm": self.realm,
62+
},
63+
)
64+
response = connection.send(req.prepare())
65+
if response.status_code != 200:
66+
raise DICOMTrolleyAuthError(
67+
f"login failed. {response.status_code}: {response.reason}"
68+
)
69+
return response
70+
71+
def __call__(self, r):
72+
"""Called before sending the request"""
73+
74+
# Make sure keep alive because session is authenticated, not just the
75+
# connection
76+
r.headers["Connection"] = "Keep-Alive"
77+
r.register_hook("response", self.response_hook)
78+
return r
79+
80+
81+
class DICOMTrolleyAuthError(DICOMTrolleyError):
82+
pass

tests/test_auth.py

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
import uuid
2+
from typing import List, Optional
3+
4+
import pytest
5+
import requests
6+
from pydantic.main import BaseModel
7+
from requests import Request
8+
9+
from dicomtrolley.auth import DICOMTrolleyAuthError, VitreaAuth
10+
from tests.mock_responses import MINT_401, MINT_SEARCH_STUDY_LEVEL
11+
12+
13+
def test_vitrea_auth_session_timeout(mock_vitrea_server):
14+
"""If session times out, login should be re-attempted"""
15+
server = mock_vitrea_server
16+
17+
assert requests.Session().get(server.mint_url).status_code == 401
18+
19+
session = requests.Session()
20+
session.auth = VitreaAuth(
21+
login_url=server.login_url,
22+
user=VITREA_CREDENTIALS.user_id,
23+
password=VITREA_CREDENTIALS.password,
24+
realm=VITREA_CREDENTIALS.realm,
25+
)
26+
27+
assert session.get(server.mint_url).status_code == 200
28+
assert len(server._calls_to_login) == 1
29+
# session times out. No 200 responses anymore from server
30+
31+
server.reset_all()
32+
assert len(server._calls_to_login) == 0
33+
34+
# this should trigger an internal re-login
35+
assert session.get(server.mint_url).status_code == 200
36+
assert len(server._calls_to_login) == 1
37+
38+
39+
def test_vitrea_auth_wrong_credentials(mock_vitrea_server):
40+
"""If credentials do not work, make this known with exception"""
41+
server = mock_vitrea_server
42+
session = requests.Session()
43+
session.auth = VitreaAuth(
44+
login_url=server.login_url,
45+
user=VITREA_CREDENTIALS.user_id,
46+
password="WRONG_PASSWORD",
47+
realm=VITREA_CREDENTIALS.realm,
48+
)
49+
with pytest.raises(DICOMTrolleyAuthError):
50+
session.get(server.mint_url)
51+
52+
53+
class VitreaCredentials(BaseModel):
54+
"""Credentials needed to log in to a Vitrea server"""
55+
56+
user_id: str
57+
password: str
58+
realm: str
59+
60+
61+
class VitreaServer:
62+
"""A server that you can log in to the vitrea way, for testing login and session
63+
persistence
64+
65+
requires requests_mock to mock url calls
66+
67+
Notes
68+
-----
69+
Tried to get actual session-cookie based request auth working but could not.
70+
For some reason a Session with a valid cookie does not pass this cookie on
71+
to requests when calling session.get(). I can't figure out why.
72+
In addition, requests_mock does not pass on cookie to session. Known issue
73+
tracked here: https://github.com/jamielennox/requests-mock/pull/143
74+
75+
Opted instead to go for a server-wide 'allow all' switch after succesful login.
76+
This will fulfil testing requirements for now.
77+
"""
78+
79+
def __init__(
80+
self,
81+
allowed_credentials: Optional[VitreaCredentials] = None,
82+
url="http://mockserver",
83+
):
84+
self.url = url
85+
self.login_url = f"{url}/login"
86+
self.mint_url = f"{url}/mint"
87+
88+
self.allowed_credentials = allowed_credentials
89+
self._authorized_tokens: List[str] = []
90+
self._authorized_sessions: List[str] = []
91+
self._calls_to_login: List[Request] = []
92+
self.allow_all = False
93+
94+
def reset_all(self):
95+
"""Remove all credentials and call logs, re-login is required"""
96+
self._authorized_sessions = []
97+
self._authorized_tokens = []
98+
self._calls_to_login = []
99+
self.allow_all = False
100+
101+
def set_allowed_credentials(self, credentials: VitreaCredentials):
102+
self.allowed_credentials = credentials
103+
104+
def register_login_response(self, requests_mock):
105+
"""Register this server's login url with requests mock"""
106+
requests_mock.register_uri(
107+
"POST", url=self.login_url, text=self.create_login_callback()
108+
)
109+
110+
def register_mint_response(self, requests_mock):
111+
requests_mock.register_uri(
112+
"GET", url=self.mint_url, text=self.create_mint_callback()
113+
)
114+
115+
def register_all_responses(self, requests_mock):
116+
"""Make sure requests calls to this servers's urls are routed through here"""
117+
self.register_login_response(requests_mock)
118+
self.register_mint_response(requests_mock)
119+
120+
def add_token(self):
121+
token = str(uuid.uuid4())
122+
self._authorized_tokens.append(token)
123+
return token
124+
125+
def add_session_token(self):
126+
session_token = str(uuid.uuid4())
127+
self._authorized_sessions.append(session_token)
128+
return session_token
129+
130+
def create_login_callback(self):
131+
def callback(request, context):
132+
self._calls_to_login.append(request)
133+
if self.can_login(request):
134+
self.allow_all = True
135+
context.status_code = 200
136+
context.json = {
137+
"access_token": self.add_token(),
138+
"token_type": "Bearer",
139+
}
140+
context.cookies = {"JSESSIONID": self.add_session_token()}
141+
return "success"
142+
else:
143+
context.status_code = 401
144+
context.reason = ("Unauthorized",)
145+
return "Login failed: bad username/password"
146+
147+
return callback
148+
149+
def can_login(self, request) -> bool:
150+
"""Does this request provide the right data to log in?"""
151+
provided = VitreaCredentials(
152+
user_id=request.headers.get("X-Userid"),
153+
password=request.headers.get("X-Password"),
154+
realm=request.headers.get("X-Realm"),
155+
)
156+
return provided == self.allowed_credentials
157+
158+
def create_mint_callback(self):
159+
def callback(request, context):
160+
if self.is_authenticated(request):
161+
context.status_code = MINT_SEARCH_STUDY_LEVEL.status_code
162+
return MINT_SEARCH_STUDY_LEVEL.text
163+
else:
164+
context.status_code = MINT_401.status_code
165+
context.reason = MINT_401.reason
166+
return MINT_401.text
167+
168+
return callback
169+
170+
def is_authenticated(self, request):
171+
"""Is this request allowed to get stuff?
172+
173+
Weirdly enough for the Vitrea Connection 8.2.0.1 there seems to be no
174+
checking of the access token at all. It's all based on the session cookie
175+
"""
176+
if self.allow_all:
177+
return True
178+
if hasattr(request, "cookies"): # this net gets used, see class notes
179+
return (
180+
request.cookies.get("JSESSIONID") in self._authorized_sessions
181+
)
182+
else:
183+
return False
184+
185+
186+
VITREA_CREDENTIALS = VitreaCredentials(
187+
user_id="user", password="pass", realm="some_realm"
188+
)
189+
190+
191+
@pytest.fixture
192+
def mock_vitrea_server(requests_mock):
193+
"""Simulates vitrea Connection login and responses
194+
195+
Valid login with VITREA_CREDENTIALS
196+
"""
197+
server = VitreaServer(allowed_credentials=VITREA_CREDENTIALS)
198+
server.register_all_responses(requests_mock)
199+
return server

0 commit comments

Comments
 (0)