|
1 | 1 | import decimal
|
| 2 | +import authsignal |
2 | 3 | from authsignal.version import VERSION
|
3 | 4 |
|
4 | 5 | import humps
|
|
9 | 10 |
|
10 | 11 | _UNICODE_STRING = str
|
11 | 12 |
|
12 |
| -API_BASE_URL = 'https://signal.authsignal.com' |
13 |
| -API_CHALLENGE_URL = 'https://api.authsignal.com/v1' |
| 13 | +API_BASE_URL = 'https://api.authsignal.com/v1' |
14 | 14 |
|
15 | 15 | BLOCK = "BLOCK"
|
16 | 16 | ALLOW = "ALLOW"
|
@@ -52,8 +52,8 @@ def __init__(
|
52 | 52 | self.url = api_url
|
53 | 53 | self.timeout = timeout
|
54 | 54 | self.version = version
|
| 55 | + self.api_version = 'v1' |
55 | 56 |
|
56 |
| - |
57 | 57 | def track(self, user_id, action, payload=None, path=None):
|
58 | 58 | """Tracks an action to authsignal, scoped to the user_id and action
|
59 | 59 | Returns the status of the action so that you can determine to whether to continue
|
@@ -152,8 +152,7 @@ def get_user(self, user_id, redirect_url=None, path=None):
|
152 | 152 | def delete_user(self, user_id):
|
153 | 153 | _assert_non_empty_unicode(user_id, 'user_id')
|
154 | 154 |
|
155 |
| - user_id = urllib.parse.quote(user_id) |
156 |
| - path = f'{self.url}/v1/users/{user_id}' |
| 155 | + path = self._delete_user_url(user_id) |
157 | 156 | headers = self._default_headers()
|
158 | 157 |
|
159 | 158 | try:
|
@@ -235,48 +234,71 @@ def enroll_verified_authenticator(self, user_id, authenticator_payload, path=No
|
235 | 234 | except requests.exceptions.RequestException as e:
|
236 | 235 | raise ApiException(str(e), path) from e
|
237 | 236 |
|
238 |
| - def validate_challenge(self, token: str, user_id: Optional[str] = None) -> Dict[str, Any]: |
239 |
| - path = f"{API_CHALLENGE_URL}/validate" |
| 237 | + def validate_challenge(self, token: str, user_id: Optional[str] = None, action: Optional[str] = None) -> Dict[str, Any]: |
| 238 | + path = self._validate_challenge_url() |
240 | 239 | headers = {
|
241 | 240 | 'Content-Type': 'application/json',
|
242 | 241 | 'Accept': 'application/json'
|
243 | 242 | }
|
244 | 243 |
|
| 244 | + payload = {'token': token} |
| 245 | + if user_id is not None: |
| 246 | + payload['userId'] = user_id |
| 247 | + if action is not None: |
| 248 | + payload['action'] = action |
| 249 | + |
245 | 250 | try:
|
246 | 251 | response = self.session.post(
|
247 | 252 | path,
|
248 | 253 | auth=requests.auth.HTTPBasicAuth(self.api_key, ''),
|
249 |
| - data=json.dumps({'token': token, 'userId': user_id}), |
| 254 | + data=json.dumps(payload), |
250 | 255 | headers=headers,
|
251 | 256 | timeout=self.timeout
|
252 | 257 | )
|
253 | 258 |
|
254 | 259 | response_data = humps.decamelize(response.json())
|
255 | 260 |
|
256 |
| - action = response_data.pop('action_code', None) |
257 |
| - |
258 |
| - return {'action': action, **response_data} |
| 261 | + return response_data |
259 | 262 | except requests.exceptions.RequestException as e:
|
260 | 263 | raise ApiException(str(e), path) from e
|
261 | 264 |
|
262 | 265 | def _default_headers(self):
|
263 | 266 | return {'Content-type': 'application/json',
|
264 | 267 | 'Accept': '*/*',
|
265 | 268 | 'User-Agent': self._user_agent()}
|
| 269 | + |
266 | 270 | def _user_agent(self):
|
267 | 271 | return f'Authsignal Python v{self.version}'
|
268 | 272 |
|
269 | 273 | def _track_url(self, user_id, action):
|
270 |
| - return f'{self.url}/v1/users/{user_id}/actions/{action}' |
271 |
| - |
| 274 | + path = self._ensure_versioned_path(f'/users/{user_id}/actions/{action}') |
| 275 | + return f'{self.url}{path}' |
| 276 | + |
272 | 277 | def _get_action_url(self, user_id, action, idempotency_key):
|
273 |
| - return f'{self.url}/v1/users/{user_id}/actions/{action}/{idempotency_key}' |
274 |
| - |
| 278 | + path = self._ensure_versioned_path(f'/users/{user_id}/actions/{action}/{idempotency_key}') |
| 279 | + return f'{self.url}{path}' |
| 280 | + |
275 | 281 | def _get_user_url(self, user_id):
|
276 |
| - return f'{self.url}/v1/users/{user_id}' |
| 282 | + path = self._ensure_versioned_path(f'/users/{user_id}') |
| 283 | + return f'{self.url}{path}' |
277 | 284 |
|
278 | 285 | def _post_enrollment_url(self, user_id):
|
279 |
| - return f'{self.url}/v1/users/{user_id}/authenticators' |
| 286 | + path = self._ensure_versioned_path(f'/users/{user_id}/authenticators') |
| 287 | + return f'{self.url}{path}' |
| 288 | + |
| 289 | + def _validate_challenge_url(self): |
| 290 | + path = self._ensure_versioned_path(f'/validate') |
| 291 | + return f'{self.url}{path}' |
| 292 | + |
| 293 | + def _delete_user_url(self, user_id): |
| 294 | + user_id = urllib.parse.quote(user_id) |
| 295 | + path = self._ensure_versioned_path(f'/users/{user_id}') |
| 296 | + return f'{self.url}{path}' |
| 297 | + |
| 298 | + def _ensure_versioned_path(self, path): |
| 299 | + if not self.url.endswith(f'/{self.api_version}'): |
| 300 | + return f'/{self.api_version}{path}' |
| 301 | + return path |
280 | 302 |
|
281 | 303 | class ApiException(Exception):
|
282 | 304 | def __init__(self, message, url, http_status_code=None, body=None, api_status=None,
|
@@ -307,4 +329,3 @@ def _assert_non_empty_dict(val, name):
|
307 | 329 | raise TypeError('{0} must be a non-empty dict'.format(name))
|
308 | 330 | elif not val:
|
309 | 331 | raise ValueError('{0} must be a non-empty dict'.format(name))
|
310 |
| - |
|
0 commit comments