11"""This script includes the LanisClient to interact with Lanis."""
22
3- from datetime import datetime
3+ import json
4+ import os
5+ from base64 import b64decode , b64encode
6+ from datetime import datetime , timedelta
7+ from pathlib import Path
8+ from time import time
49
510import httpx
11+ from Cryptodome .Cipher import AES
12+ from Cryptodome .Protocol .KDF import scrypt
13+ from Cryptodome .Random import get_random_bytes
614
715from .constants import LOGGER , URL
16+ from .exceptions import (
17+ NoSchoolFoundError ,
18+ WrongCredentialsError ,
19+ )
820from .functions .apps import (
921 App ,
1022 Folder ,
@@ -69,11 +81,13 @@ def __init__( # noqa: D107
6981
7082 self .authenticated = False
7183
84+ self .authentication_method = None
85+
7286 Request .set_headers (self .ad_header )
7387
7488 self .cryptor = Cryptor ()
7589
76- LOGGER .info ("USING VERSION 0.3.2 " )
90+ LOGGER .info ("USING VERSION 0.4.0 ALPHA " )
7791
7892 LOGGER .warning ("LANISAPI IS STILL IN A EARLY STAGE SO EXPECT BUGS." )
7993
@@ -89,18 +103,69 @@ def __del__(self) -> None:
89103 """If the script closes close the parser."""
90104 Request .close ()
91105
92- def close (self ) -> None :
93- """Close the client; you need to do this."""
94- Request .close ()
95- self .authenticated = False
96- LOGGER .info ("Closed current session." )
97-
98106 @property
99107 def authentication_cookies (self ) -> LanisCookie :
100108 """Return ``LanisCookie`` with the authentication data (school id and session id) if authenticated. You can use this to authenticate with Lanis instantly."""
101109 cookies = Request .get_cookies ()
102110 return LanisCookie (cookies .get ("i" , domain = "" ), cookies .get ("sid" ))
103111
112+ def close (self ) -> None :
113+ """Close the client and saves to session.json; you need to do this."""
114+ Request .close ()
115+
116+ self .authenticated = False
117+
118+ # If we already authenticated using the file just update it quickly
119+ if self .authentication_method == "SessionsFile" :
120+ with open ("session.json" , "r+" ) as file :
121+ session_file : dict [str , any ] = json .loads (file .read ())
122+ session_file ["timestamp" ] = time ()
123+ session_file .update (session_file )
124+
125+ file .seek (0 )
126+ file .write (json .dumps (session_file ))
127+
128+ LOGGER .info ("Closed current session." )
129+
130+ return
131+
132+ # Encrypt the session data using AES-GCM and scrypt for the key
133+ salt = get_random_bytes (12 )
134+ key = b64encode (scrypt (self .authentication .password , salt , 16 , 2 ** 14 , 8 , 1 ))
135+ cipher = AES .new (key , AES .MODE_GCM , nonce = salt )
136+
137+ session_id = cipher .encrypt_and_digest (
138+ self .authentication_cookies .session_id .encode ()
139+ )
140+
141+ # session_id: Append salt to beginning (16 chars) then mac (32 chars) and then the ciphertext.
142+ session_data = {
143+ "school_id" : self .authentication_cookies .school_id ,
144+ "session_id" : f"{ b64encode (salt ).decode ()} { b64encode (session_id [1 ]).decode ()} { b64encode (session_id [0 ]).decode ()} " ,
145+ "timestamp" : time (),
146+ }
147+
148+ # If file exist update it.
149+ if Path ("session.json" ).exists ():
150+ with open ("session.json" , "r+" ) as file :
151+ raw_session_file = file .read ()
152+ # If empty
153+ if not raw_session_file :
154+ file .write (json .dumps (session_data ))
155+ LOGGER .info ("Closed current session." )
156+ return
157+
158+ session_file : dict [str , any ] = json .loads (raw_session_file )
159+ session_file .update (session_data )
160+
161+ file .seek (0 )
162+ file .write (json .dumps (session_file ))
163+ else :
164+ with open ("session.json" , "w" ) as file :
165+ file .write (json .dumps (session_data ))
166+
167+ LOGGER .info ("Closed current session." )
168+
104169 @handle_exceptions
105170 def get_schools (self ) -> list [dict [str , str ]]:
106171 """Return all schools with their id, name and city.
@@ -113,9 +178,14 @@ def get_schools(self) -> list[dict[str, str]]:
113178 return _get_schools (self .save )
114179
115180 @handle_exceptions
116- def authenticate (self ) -> None :
181+ def authenticate (self , force : bool = False ) -> None :
117182 """Log into the school portal and sets the session id in the auth_cookies.
118183
184+ Parameters
185+ ----------
186+ force : bool, optional
187+ If True it always makes a new session with Lanis, by default False
188+
119189 Note
120190 ----
121191 Supports only the new system (login.schulportal.hessen.de).
@@ -127,35 +197,96 @@ def authenticate(self) -> None:
127197
128198 school_id : int
129199
130- if isinstance (self .authentication , LanisCookie ):
131- Request .set_cookies (
132- {
133- "i" : self .authentication .school_id ,
134- "sid" : self .authentication .session_id ,
135- }
136- )
137- LOGGER .info (
138- "Authenticate: Using cookies to authenticate, skip authentication."
139- )
140- else :
200+ if not force :
201+ if isinstance (self .authentication , LanisCookie ):
202+ Request .set_cookies (
203+ {
204+ "i" : self .authentication .school_id ,
205+ "sid" : self .authentication .session_id ,
206+ }
207+ )
208+ LOGGER .info (
209+ "Authenticate: Using cookies to authenticate, skip authentication."
210+ )
211+ self .authentication_method = "LanisCookie"
212+ elif Path ("session.json" ).exists ():
213+ with open ("session.json" , "r" ) as file :
214+ raw_session_file = file .read ()
215+
216+ # If session file is empty return forced authenticate
217+ if raw_session_file :
218+ try :
219+ session_file : dict [str , any ] = json .loads (raw_session_file )
220+ except json .JSONDecodeError :
221+ LOGGER .warning ("Authenticate: session.json file is corrupted." )
222+ os .remove ("session.json" )
223+ self .authenticate (True )
224+ return
225+ else :
226+ LOGGER .info ("Authenticate: session.json file is empty." )
227+ self .authenticate (True )
228+ return
229+
230+ # If session data is not older then 100 minutes.
231+ if session_file and datetime .fromtimestamp (
232+ session_file ["timestamp" ]
233+ ) > datetime .now () + timedelta (minutes = - 100 ):
234+ # Preparing the decrypt of the session id.
235+ salt = b64decode (session_file ["session_id" ][:16 ])
236+ mac = b64decode (session_file ["session_id" ][16 :40 ])
237+ key = b64encode (
238+ scrypt (
239+ self .authentication .password , salt , 16 , 2 ** 14 , 8 , 1
240+ )
241+ )
242+ cipher = AES .new (key , AES .MODE_GCM , nonce = salt )
243+
244+ # Decrypt and verify if the mac is right.
245+ try :
246+ session_id = cipher .decrypt_and_verify (
247+ b64decode (session_file ["session_id" ][40 :].encode ()), mac
248+ ).decode ()
249+ except ValueError :
250+ LOGGER .info ("Authenticate: session.json file is corrupted." )
251+ self .authenticate (True )
252+ return
253+
254+ Request .set_cookies (
255+ {
256+ "i" : session_file ["school_id" ],
257+ "sid" : session_id ,
258+ }
259+ )
260+ else :
261+ LOGGER .info ("Authenticate: Session.json file is outdated." )
262+ os .remove ("session.json" )
263+ self .authenticate (True )
264+ return
265+
266+ LOGGER .info ("Authenticate: Using found session.json file." )
267+ self .authentication_method = "SessionsFile"
268+
269+ if force or not (
270+ Path ("session.json" ).exists ()
271+ or isinstance (self .authentication , LanisCookie )
272+ ):
141273 # Check if a id or school and place is provided.
142274 if isinstance (self .authentication .school , str ):
143275 school_id = self .authentication .school
144276 else :
145277 schools = self .get_schools ()
146278
279+ # Try to get wanted school with a one liner generator.
147280 try :
148281 school_id = next (
149282 school
150283 for school in schools
151284 if school ["Name" ] == self .authentication .school .name
152285 and school ["Ort" ] == self .authentication .school .city
153286 )["Id" ]
154- except StopIteration :
155- LOGGER .warning (
156- "Authenticate: School doesn't exist, check for right spelling."
157- )
158- return
287+ except StopIteration as err :
288+ msg = "School doesn't exist, check for right spelling."
289+ raise NoSchoolFoundError (msg ) from err
159290
160291 # Get new session (value: SPH-Session) by posting to login page.
161292 response_session = get_session (
@@ -165,10 +296,8 @@ def authenticate(self) -> None:
165296
166297 if not response_session ["location" ]:
167298 # It also could be other problems, Lanis can be very finicky.
168- LOGGER .error (
169- "Authenticate: Could not log in, possibly wrong credentials."
170- )
171- return
299+ msg = "Could not log in, possibly wrong credentials."
300+ raise WrongCredentialsError (msg )
172301
173302 # Get authentication url to get sid.
174303 auth_url = get_authentication_url (response_cookies )
@@ -178,6 +307,8 @@ def authenticate(self) -> None:
178307 get_authentication_sid (auth_url , response_cookies , schoolid = school_id )
179308 )
180309
310+ self .authentication_method = "LanisAccount"
311+
181312 # Tell Lanis how to encrypt
182313 if not self .cryptor .authenticate ():
183314 LOGGER .error ("Authenticate: Couldn't handshake with Lanis." )
0 commit comments