-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth_handler.py
183 lines (155 loc) · 6.21 KB
/
auth_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Website login class, handles authentication of users, refreshing the jwt key as needed, and requesting user information
import jwt
import requests
import time
from storage_db import storage_db
class AuthHandler():
def __init__(self, refresh_token = None, JWT = None, username = None, password = None):
self.JWT = None
self.refresh_token = None
self.storage = storage_db()
if JWT is not None:
self.JWT = JWT
if username is not None and password is not None:
self.login_with_credentials(username, password)
elif refresh_token is not None:
self.refresh_token = refresh_token
elif self.refresh_token is None:
auth_token = self.storage.retrieve_data('auth')
if auth_token:
self.refresh_token = auth_token['refresh_token']
if JWT in auth_token:
self.JWT = auth_token['JWT']
self.validate_jwt()
else:
self.refresh_login()
@property
def decoded_jwt(self):
return jwt.decode(self.JWT, options={"verify_signature": False})
@property
def logged_in(self):
if self.JWT is not None:
return self.validate_jwt()
else:
return False
@property
def username(self):
if self.logged_in:
return self.decoded_jwt['email']
else:
return None
@property
def roles(self):
return self.check_roles()
def update_jwt(self, jwt_token):
self.JWT = jwt_token
def login_with_credentials(self, username, password):
credentials = {'username': username, 'password': password, 'grant_type': 'password'}
login_response = requests.post('https://deepmake.com/.netlify/identity/token',data=credentials)
if login_response.status_code == 200:
self.refresh_token = login_response.json()['refresh_token']
self.update_jwt(login_response.json()['access_token'])
self.storage.store_data('auth', {'refresh_token': self.refresh_token, 'JWT': login_response.json()['access_token']})
return True
else:
return False
def refresh_login(self):
if self.refresh_token is None:
return False
refresh_response = requests.post('https://deepmake.com/.netlify/identity/token',data={'refresh_token': self.refresh_token, 'grant_type': 'refresh_token'})
if refresh_response.status_code == 200:
self.refresh_token = refresh_response.json()['refresh_token']
self.update_jwt(refresh_response.json()['access_token'])
self.storage.store_data('auth', {'refresh_token': self.refresh_token, 'JWT': refresh_response.json()['access_token']})
return True
else:
return False
def get_user_info(self):
user_info = requests.get('https://deepmake.com/.netlify/identity/user', headers={'Authorization': 'Bearer ' + self.JWT})
if user_info.status_code == 200:
return user_info.json()
else:
return False
def validate_jwt(self):
if self.JWT is None:
return False
if self.decoded_jwt['exp'] < time.time():
return self.refresh_login()
else:
return True
def get_url(self, url):
self.validate_jwt()
headers = {'Authorization': f'Bearer {self.JWT}', 'Cookie': f'nf_jwt={self.JWT}'}
response = None
if url.endswith('.zip'):
return self.large_download(url)
else:
response = requests.get(url, headers=headers)
if response.status_code == 200:
if response.headers['Content-Type'] == 'application/json':
return response.json()
elif response.headers['Content-Type'] == 'application/zip':
return response.content
elif response.headers['Content-Type'] == 'text/html':
return response.text
else:
return False
def large_download(self, url):
start = 0
data = bytearray()
headers = {'Authorization': f'Bearer {self.JWT}', 'Cookie': f'nf_jwt={self.JWT}'}
while True:
try:
# Set the range header to request a subset of the file
headers['Range'] = f'bytes={start}-'
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status() # Check for request errors
if response.status_code > 299:
return response
# Read the content in chunks
for chunk in response.iter_content(chunk_size=1024): # 1KB chunks
if chunk:
data.extend(chunk)
start += len(chunk)
# If the loop completes without errors, break out of the while loop
break
except requests.exceptions.ChunkedEncodingError:
# print("Chunked Encoding Error occurred, retrying...")
continue # Continue the loop and try to reconnect from where it left off
except Exception as e:
# print(f"An error occurred: {e}")
break
return data
def check_roles(self):
if not self.logged_in:
return []
if not self.validate_jwt():
return []
user_info = self.get_user_info()
if 'roles' not in user_info['app_metadata']:
return []
return user_info['app_metadata']['roles']
def check_permissions(self, level=1):
if not self.logged_in:
return False
if level == 0:
return True
roles = self.check_roles()
if len(roles) > 0:
return True
else:
return False
def permission_level(self):
if not self.logged_in:
return False
roles = self.check_roles()
if len(roles) > 0:
return 1
else:
return 0
def logout(self):
self.JWT = None
self.refresh_token = None
self.storage.store_data('auth', {})
return True
auth_handler = AuthHandler()