11import re
2- from typing import Any , Dict , List , Optional
2+ from typing import Any , Dict , List , Optional , Tuple
33
44from controller .auth import kratos
55from fastapi import Request
1616from submodules .model .business_objects .user import check_email_in_full_admin
1717from submodules .model .models import Organization , Project , User
1818import sqlalchemy
19+ from dataclasses import dataclass
20+ from .kratos import get_identity_is_admin
1921
2022DEV_USER_ID = "741df1c2-a531-43b6-b259-df23bc78e9a2"
2123
2224EMAIL_RE = re .compile (r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$" )
2325
2426
27+ @dataclass (frozen = True )
28+ class AdminData :
29+ is_admin : bool
30+ is_full_admin : bool
31+
32+
2533def get_organization_id_by_info (info ) -> Organization :
2634 user = get_user_by_info (info )
2735 if not user or not user .organization_id :
@@ -90,8 +98,8 @@ def check_project_access(info, project_id: str) -> None:
9098 raise AuthManagerError ("Project not found" )
9199
92100
93- def check_admin_access (info ) -> None :
94- if not check_is_admin ( info . context [ "request" ]) :
101+ def check_admin_access (request_state : Any ) -> None :
102+ if not request_state . adm . is_admin :
95103 raise AuthManagerError ("Admin access required" )
96104
97105
@@ -114,25 +122,26 @@ def check_project_access_from_user_id(
114122 return True
115123
116124
117- def check_is_admin (request : Any ) -> bool :
125+
126+ def __check_is_admin_header (request : Request ) -> Tuple [bool , bool ]:
118127 if "Authorization" in request .headers :
119128 jwt_decoded : Dict [str , Any ] = jwt .decode (
120129 request .headers ["Authorization" ].split (" " )[1 ],
121130 options = {"verify_signature" : False },
122131 )
123- subject : Dict [ str , Any ] = jwt_decoded ["session" ]["identity" ]
124- if (
125- subject [ "traits" ][ "email" ]. split ( "@" )[ 1 ] == "kern.ai"
126- and subject [ "verifiable_addresses" ][ 0 ][ "verified" ]
127- ):
128- return True
129- elif (
130- # subject metadata_public can be None so we use or {} instead of get with default
131- ( subject . get ( "metadata_public" ) or {}). get ( "role" ) == "ADMIN"
132- and subject [ "verifiable_addresses" ][ 0 ][ "verified" ]
133- ):
134- return True
135- return False
132+ identity = jwt_decoded ["session" ]["identity" ]
133+ email = identity [ "traits" ][ "email" ]
134+
135+ return get_identity_is_admin ( identity ), check_email_in_full_admin ( email )
136+ return False , False
137+
138+
139+ def parse_admin_info ( request : Any ) -> None :
140+ is_admin , is_full_admin = __check_is_admin_header ( request )
141+ request . state . adm = AdminData (
142+ is_admin = is_admin ,
143+ is_full_admin = is_full_admin ,
144+ )
136145
137146
138147def check_is_single_organization () -> bool :
@@ -148,11 +157,7 @@ def extract_state_info(request: Request, key: str) -> Any:
148157 user = get_user_by_info (request .state .info )
149158 if user and user .organization_id :
150159 value = str (user .organization_id )
151- elif key == "is_admin" :
152- value = check_is_admin (request )
153- elif key == "log_request" :
154- # lazy and => db access only if admin is true
155- if extract_state_info (request , "is_admin" ):
160+ elif key == "log_request" and request .state .adm .is_admin :
156161 value = organization .log_admin_requests (
157162 extract_state_info (request , "organization_id" )
158163 )
@@ -168,15 +173,10 @@ def extract_state_info(request: Request, key: str) -> Any:
168173def check_is_full_admin (request : Any ) -> bool :
169174 if request .url .hostname == "localhost" and request .url .port == 7051 :
170175 return True
171- if check_is_admin (request ):
172- jwt_decoded : Dict [str , Any ] = jwt .decode (
173- request .headers ["Authorization" ].split (" " )[1 ],
174- options = {"verify_signature" : False },
175- )
176- subject : Dict [str , Any ] = jwt_decoded ["session" ]["identity" ]
177- if check_email_in_full_admin (subject ["traits" ]["email" ]):
178- return True
179- return False
176+ state = getattr (request .state , "adm" , None )
177+ if not state :
178+ raise AuthManagerError ("Admin state is not set in request.state.adm" )
179+ return state .is_full_admin
180180
181181
182182def invite_users (
0 commit comments