Skip to content

Commit fb59c7c

Browse files
committed
add jupyter version of token fetch
1 parent ccb7c51 commit fb59c7c

File tree

1 file changed

+269
-0
lines changed

1 file changed

+269
-0
lines changed
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
import os
2+
import json
3+
import time
4+
from getpass import getpass
5+
from dotenv import load_dotenv
6+
from urllib.parse import urlparse, parse_qs
7+
from http.server import BaseHTTPRequestHandler, HTTPServer
8+
import webbrowser
9+
10+
# Jupyter-compatible imports
11+
try:
12+
from IPython.display import display, HTML
13+
from ipywidgets import widgets
14+
IN_JUPYTER = True
15+
except ImportError:
16+
IN_JUPYTER = False
17+
18+
from field_manager_python_client import AuthenticatedClient, Client
19+
from field_manager_python_client.api.public import (
20+
get_organization_by_email_address_public_organizations_email_address_get,
21+
get_organization_information_public_organizations_organization_id_information_get,
22+
)
23+
from keycloak import KeycloakOpenID
24+
25+
# -----------------------------
26+
# Load environment variables
27+
# -----------------------------
28+
load_dotenv()
29+
30+
KEYCLOAK_SERVER_URL = os.getenv(
31+
"KEYCLOAK_SERVER_URL", "https://keycloak.test.ngiapi.no/auth/"
32+
)
33+
KEYCLOAK_REALM = os.getenv("KEYCLOAK_REALM", "tenant-geohub-public")
34+
KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "fieldmanager-client")
35+
DEFAULT_SCOPE = os.getenv("KEYCLOAK_SCOPE", "openid")
36+
BASE_URL = os.getenv("BASE_URL", "https://app.test.fieldmanager.io/api/location")
37+
DEFAULT_EMAIL = os.getenv("DEFAULT_EMAIL", "[email protected]")
38+
39+
# -----------------------------------
40+
# Set the token file under ~/.secrets/
41+
# -----------------------------------
42+
secret_folder = os.path.expanduser("~/.secrets")
43+
os.makedirs(secret_folder, exist_ok=True) # Create ~/.secrets if it doesn't exist
44+
# os.chmod(secret_folder, 0o700) # Ensure the folder has 700 (drwx------)
45+
46+
TOKEN_STORE_FILE = os.path.join(secret_folder, "field_manager_token_store.json")
47+
48+
# Initialize public client
49+
public_client = Client(base_url=BASE_URL)
50+
51+
# ---------------------------------------------------
52+
# Token manager class with complete implementation
53+
# ---------------------------------------------------
54+
class TokenManager:
55+
def __init__(self, keycloak_openid, initial_token=None):
56+
self.keycloak = keycloak_openid
57+
self.storage = JupyterStorage() if IN_JUPYTER else FileStorage(TOKEN_STORE_FILE)
58+
59+
if initial_token:
60+
self._update_tokens(initial_token)
61+
else:
62+
self._load_tokens()
63+
64+
def _update_tokens(self, token):
65+
self.access_token = token["access_token"]
66+
self.refresh_token = token.get("refresh_token")
67+
self.expires_at = time.time() + token["expires_in"]
68+
self._save_tokens()
69+
70+
def _load_tokens(self):
71+
tokens = self.storage.load()
72+
self.access_token = tokens.get("access_token")
73+
self.refresh_token = tokens.get("refresh_token")
74+
self.expires_at = tokens.get("expires_at", 0)
75+
76+
def _save_tokens(self):
77+
self.storage.save({
78+
"access_token": self.access_token,
79+
"refresh_token": self.refresh_token,
80+
"expires_at": self.expires_at
81+
})
82+
83+
def is_access_token_valid(self):
84+
return time.time() < self.expires_at - 60
85+
86+
def refresh_access_token(self):
87+
if not self.refresh_token:
88+
return False
89+
try:
90+
new_token = self.keycloak.refresh_token(self.refresh_token)
91+
self._update_tokens(new_token)
92+
return True
93+
except Exception as e:
94+
print(f"Failed to refresh token: {e}")
95+
return False
96+
97+
def get_valid_token(self):
98+
if self.is_access_token_valid():
99+
return self.access_token
100+
if self.refresh_access_token():
101+
return self.access_token
102+
return None
103+
104+
# ---------------------------
105+
# Storage implementations
106+
# ---------------------------
107+
class FileStorage:
108+
def __init__(self, filename):
109+
self.filename = filename
110+
111+
def save(self, data):
112+
try:
113+
with open(self.filename, 'w') as f:
114+
json.dump(data, f)
115+
except Exception as e:
116+
print(f"Error saving tokens: {e}")
117+
118+
def load(self):
119+
try:
120+
if os.path.exists(self.filename):
121+
with open(self.filename, 'r') as f:
122+
return json.load(f)
123+
except Exception as e:
124+
print(f"Error loading tokens: {e}")
125+
return {}
126+
127+
class JupyterStorage:
128+
def save(self, data):
129+
try:
130+
from IPython.core.interactiveshell import InteractiveShell
131+
InteractiveShell.instance().user_ns['_fm_tokens'] = data
132+
except Exception as e:
133+
print(f"Error saving to Jupyter storage: {e}")
134+
135+
def load(self):
136+
try:
137+
from IPython.core.interactiveshell import InteractiveShell
138+
return InteractiveShell.instance().user_ns.get('_fm_tokens', {})
139+
except Exception as e:
140+
print(f"Error loading from Jupyter storage: {e}")
141+
return {}
142+
143+
# ---------------------------
144+
# Authentication flows
145+
# ---------------------------
146+
def authenticate():
147+
keycloak_openid = KeycloakOpenID(
148+
server_url=KEYCLOAK_SERVER_URL,
149+
client_id=KEYCLOAK_CLIENT_ID,
150+
realm_name=KEYCLOAK_REALM,
151+
)
152+
153+
token_manager = TokenManager(keycloak_openid)
154+
155+
if valid_token := token_manager.get_valid_token():
156+
print("Using cached tokens")
157+
return AuthenticatedClient(base_url=BASE_URL, token=valid_token)
158+
159+
email = _get_email_input()
160+
161+
auth_info = get_auth_method(email)
162+
auth_method = auth_info["auth_method"]
163+
authentication_alias = auth_info.get("authentication_alias")
164+
165+
if auth_method == "sso":
166+
token_manager = _sso_flow(keycloak_openid, authentication_alias)
167+
elif auth_method == "password":
168+
token_manager = _password_flow(keycloak_openid, email)
169+
else:
170+
raise ValueError("Unsupported authentication method")
171+
172+
return AuthenticatedClient(base_url=BASE_URL, token=token_manager.get_valid_token())
173+
174+
def _sso_flow(keycloak_openid, authentication_alias):
175+
redirect_uri = "urn:ietf:wg:oauth:2.0:oob" if IN_JUPYTER else "http://localhost:8000"
176+
177+
auth_url = keycloak_openid.auth_url(
178+
redirect_uri=redirect_uri,
179+
scope=DEFAULT_SCOPE,
180+
)
181+
182+
if authentication_alias:
183+
auth_url += f"&kc_idp_hint={authentication_alias}"
184+
185+
if IN_JUPYTER:
186+
_show_jupyter_auth_prompt(auth_url)
187+
code = input("Paste authorization code: ").strip()
188+
else:
189+
webbrowser.open_new(auth_url)
190+
code = _local_server_capture()
191+
192+
token = keycloak_openid.token(
193+
grant_type="authorization_code",
194+
code=code,
195+
redirect_uri=redirect_uri,
196+
scope=DEFAULT_SCOPE,
197+
)
198+
return TokenManager(keycloak_openid, token)
199+
200+
def _password_flow(keycloak_openid, email):
201+
password = getpass("Enter password: ")
202+
token = keycloak_openid.token(
203+
username=email,
204+
password=password,
205+
scope=DEFAULT_SCOPE,
206+
)
207+
return TokenManager(keycloak_openid, token)
208+
209+
# ---------------------------
210+
# Helper functions
211+
# ---------------------------
212+
def _get_email_input():
213+
if IN_JUPYTER:
214+
email_widget = widgets.Text(value=DEFAULT_EMAIL, description="Email:")
215+
display(email_widget)
216+
return email_widget.value
217+
else:
218+
print(f"Default email: {DEFAULT_EMAIL}")
219+
use_default = input("Use default? (y/n): ").strip().lower()
220+
return DEFAULT_EMAIL if use_default == 'y' else input("Enter email: ").strip()
221+
222+
def _show_jupyter_auth_prompt(auth_url):
223+
display(HTML(f'''
224+
<div style="border: 1px solid #e0e0e0; padding: 20px; border-radius: 5px; margin: 10px 0;">
225+
<h3 style="margin-top: 0;">🔑 SSO Authentication Required</h3>
226+
<p>1. <a href="{auth_url}" target="_blank" style="color: #0066cc; text-decoration: none;">
227+
Click here to authenticate
228+
</a></p>
229+
<p>2. After completing authentication, paste the authorization code below:</p>
230+
</div>
231+
'''))
232+
233+
def _local_server_capture():
234+
class AuthHandler(BaseHTTPRequestHandler):
235+
def do_GET(self):
236+
query = parse_qs(urlparse(self.path).query)
237+
self.server.auth_code = query.get("code", [None])[0]
238+
self.send_response(200)
239+
self.end_headers()
240+
self.wfile.write(b"Authentication complete. You may close this window.")
241+
242+
server = HTTPServer(("localhost", 8000), AuthHandler)
243+
print("Waiting for authorization code in browser...")
244+
server.handle_request()
245+
print("Handled request.")
246+
return server.auth_code
247+
248+
def get_auth_method(email):
249+
try:
250+
org = get_organization_by_email_address_public_organizations_email_address_get.sync(
251+
client=public_client, email_address=email
252+
)
253+
org_info = get_organization_information_public_organizations_organization_id_information_get.sync(
254+
client=public_client, organization_id=org.organization_id
255+
)
256+
return {
257+
"auth_method": "sso" if org_info.authentication_alias else "password",
258+
"authentication_alias": org_info.authentication_alias
259+
}
260+
except Exception as e:
261+
print(f"Error determining auth method: {e}")
262+
return {"auth_method": "password", "authentication_alias": None}
263+
264+
# ---------------------------
265+
# Entry point
266+
# ---------------------------
267+
if __name__ == "__main__":
268+
client = authenticate()
269+
print("Authentication successful. Client ready.")

0 commit comments

Comments
 (0)