|
3 | 3 | from authsignal.version import VERSION
|
4 | 4 |
|
5 | 5 | import humps
|
| 6 | +from typing import Dict, Any, Optional |
6 | 7 | import json
|
7 | 8 | import requests
|
| 9 | + |
8 | 10 | _UNICODE_STRING = str
|
9 | 11 |
|
10 | 12 | API_BASE_URL = 'https://signal.authsignal.com'
|
| 13 | +API_CHALLENGE_URL = 'https://api.authsignal.com/v1' |
11 | 14 |
|
12 | 15 | BLOCK = "BLOCK"
|
13 | 16 | ALLOW = "ALLOW"
|
@@ -175,31 +178,29 @@ def enroll_verified_authenticator(self, user_id, authenticator_payload, path=No
|
175 | 178 | except requests.exceptions.RequestException as e:
|
176 | 179 | raise ApiException(str(e), path) from e
|
177 | 180 |
|
178 |
| - def validate_challenge(self, token, user_id=None): |
179 |
| - try: |
180 |
| - decoded_token = jwt.decode(token, self.api_key, algorithms=["HS256"], options={'verify_aud': False}) |
181 |
| - |
182 |
| - except jwt.DecodeError as e: |
183 |
| - print(e) |
184 |
| - return |
185 |
| - |
186 |
| - decoded_user_id = decoded_token["other"]["userId"] |
187 |
| - action = decoded_token["other"]["actionCode"] |
188 |
| - idempotency_key = decoded_token["other"]["idempotencyKey"] |
| 181 | + def validate_challenge(self, token: str, user_id: Optional[str] = None) -> Dict[str, Any]: |
| 182 | + path = f"{API_CHALLENGE_URL}/validate" |
| 183 | + headers = { |
| 184 | + 'Content-Type': 'application/json', |
| 185 | + 'Accept': 'application/json' |
| 186 | + } |
189 | 187 |
|
190 |
| - if user_id and user_id != decoded_user_id: |
191 |
| - return {"user_id": decoded_user_id, "success": False, "state": None} |
192 |
| - |
193 |
| - if action and idempotency_key: |
194 |
| - action_result = self.get_action(user_id=decoded_user_id, action=action, idempotency_key=idempotency_key) |
195 |
| - |
196 |
| - if action_result: |
197 |
| - state = action_result["state"] |
198 |
| - success = state == "CHALLENGE_SUCCEEDED" |
| 188 | + try: |
| 189 | + response = self.session.post( |
| 190 | + path, |
| 191 | + auth=requests.auth.HTTPBasicAuth(self.api_key, ''), |
| 192 | + data=json.dumps({'token': token, 'userId': user_id}), |
| 193 | + headers=headers, |
| 194 | + timeout=self.timeout |
| 195 | + ) |
| 196 | + |
| 197 | + response_data = humps.decamelize(response.json()) |
199 | 198 |
|
200 |
| - return {"user_id": decoded_user_id, "success": success, "state": state, "action": action} |
| 199 | + action = response_data.pop('action_code', None) |
201 | 200 |
|
202 |
| - return {"userId": decoded_user_id, "success": False, "state": None} |
| 201 | + return {'action': action, **response_data} |
| 202 | + except requests.exceptions.RequestException as e: |
| 203 | + raise ApiException(str(e), path) from e |
203 | 204 |
|
204 | 205 | def _default_headers(self):
|
205 | 206 | return {'Content-type': 'application/json',
|
|
0 commit comments