Skip to content

Commit ebe2351

Browse files
committed
feat: api ci test; auto otp
1 parent 8d6bc72 commit ebe2351

File tree

3 files changed

+105
-10
lines changed

3 files changed

+105
-10
lines changed

.dockerignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,8 @@ README.md
1111
credentials.json
1212
token.json
1313
erpcreds.py
14-
erpcreds.py.template
14+
erpcreds.py.template
15+
16+
api-integration-test.py
17+
.Dockerfile-dev
18+
docker-compose-dev.yaml

api-integration-test.py

Lines changed: 94 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import argparse
12
import requests
23
import erpcreds
34

@@ -15,20 +16,19 @@ def get_secret_question():
1516
if res_json["status"] == "success":
1617
secret_question = res_json["SECRET_QUESTION"]
1718
session_token = res_json["SESSION_TOKEN"]
18-
print(f"✅ Secret Question: {secret_question}")
1919
print(f"✅ Session Token: {session_token}")
2020
return secret_question, session_token
2121
else:
2222
print(f"❌ Error: {res_json['message']}")
2323
exit(1)
2424

25-
def request_otp(session_token, secret_question):
25+
def request_otp(session_token, secret_answer):
2626
url = f"{BASE_URL}/request-otp"
2727
headers = {"Session-Token": session_token}
2828
data = {
2929
"roll_number": erpcreds.ROLL_NUMBER,
3030
"password": erpcreds.PASSWORD,
31-
"secret_answer": erpcreds.SECURITY_QUESTIONS_ANSWERS[secret_question],
31+
"secret_answer": secret_answer,
3232
}
3333

3434
response = session.post(url, headers=headers, data=data)
@@ -40,13 +40,13 @@ def request_otp(session_token, secret_question):
4040
print(f"❌ Error: {res_json['message']}")
4141
exit(1)
4242

43-
def login(session_token, secret_question, otp):
43+
def login(session_token, secret_answer, otp):
4444
url = f"{BASE_URL}/login"
4545
headers = {"Session-Token": session_token}
4646
data = {
4747
"roll_number": erpcreds.ROLL_NUMBER,
4848
"password": erpcreds.PASSWORD,
49-
"secret_answer": erpcreds.SECURITY_QUESTIONS_ANSWERS[secret_question],
49+
"secret_answer": secret_answer,
5050
"otp": otp,
5151
}
5252

@@ -76,11 +76,96 @@ def download_timetable(sso_token):
7676
print(f"❌ Error: {response.json().get('message', 'Unknown error')}")
7777
exit(1)
7878

79+
def _getOTP(OTP_CHECK_INTERVAL: float, session_token: str, log: bool = False):
80+
import time
81+
import base64
82+
from googleapiclient.discovery import build
83+
84+
85+
def getMailID(service):
86+
subject = "OTP for Sign In in ERP Portal of IIT Kharagpur"
87+
88+
query = f"subject:{subject}"
89+
results = service.users().messages().list(userId="me", q=query, maxResults=1).execute()
90+
messages = results.get("messages", [])
91+
92+
if messages:
93+
message = service.users().messages().get(userId="me", id=messages[0]["id"]).execute()
94+
return message["id"]
95+
96+
return None
97+
98+
def generate_token():
99+
"""Generates token.json from credentials.json file with readonly access to mails."""
100+
import os
101+
from google.oauth2.credentials import Credentials
102+
from google.auth.transport.requests import Request
103+
from google_auth_oauthlib.flow import InstalledAppFlow
104+
105+
token_path = os.path.join(os.getcwd(), "token.json")
106+
credentials_path = os.path.join(os.getcwd(), "credentials.json")
107+
108+
scopes = ["https://www.googleapis.com/auth/gmail.readonly"]
109+
110+
creds = None
111+
if os.path.exists(token_path):
112+
creds = Credentials.from_authorized_user_file(token_path, scopes)
113+
if not creds or not creds.valid:
114+
if creds and creds.expired and creds.refresh_token:
115+
creds.refresh(Request())
116+
else:
117+
flow = InstalledAppFlow.from_client_secrets_file(credentials_path, scopes)
118+
creds = flow.run_local_server(port=0)
119+
120+
if not os.path.exists(token_path):
121+
with open(token_path, "w") as token:
122+
token.write(creds.to_json())
123+
124+
return creds
125+
126+
creds = generate_token()
127+
service = build("gmail", "v1", credentials=creds)
128+
129+
latest_mail_id = getMailID(service)
130+
request_otp(session_token, secret_answer)
131+
if log:
132+
print("⏱︎ Waiting for OTP ...")
133+
134+
while True:
135+
if (mail_id := getMailID(service)) != latest_mail_id:
136+
break
137+
time.sleep(OTP_CHECK_INTERVAL)
138+
139+
mail = service.users().messages().get(userId="me", id=mail_id).execute()
140+
if "body" in mail["payload"]:
141+
body_data = mail["payload"]["body"]["data"]
142+
decoded_body_data = base64.urlsafe_b64decode(body_data).decode("utf-8")
143+
otp = [part for part in decoded_body_data.split() if part.isdigit()][-1]
144+
145+
return otp
146+
147+
def _parse_args():
148+
parser = argparse.ArgumentParser()
149+
parser.add_argument(
150+
"-o",
151+
"--otp",
152+
action="store_true",
153+
help="Automate OTP fetching",
154+
)
155+
args = parser.parse_args()
156+
return args
157+
79158
if __name__ == "__main__":
80159
secret_question, session_token = get_secret_question()
81-
request_otp(session_token, secret_question)
82-
83-
otp = input("Enter OTP received via email: ").strip()
84-
sso_token = login(session_token, secret_question, otp)
160+
secret_answer = erpcreds.SECURITY_QUESTIONS_ANSWERS[secret_question]
161+
162+
args = _parse_args()
163+
if args.otp:
164+
otp = _getOTP(OTP_CHECK_INTERVAL=2, session_token=session_token, log=True)
165+
else:
166+
request_otp(session_token, secret_answer)
167+
otp = input("Enter OTP received via email: ").strip()
168+
169+
sso_token = login(session_token, secret_answer, otp)
85170

86171
download_timetable(sso_token)

requirements.ci.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
argparse
2+
requests==2.32.3
3+
google-api-python-client==2.160.0
4+
google-auth==2.38.0
5+
google-auth-httplib2==0.2.0
6+
google-auth-oauthlib==1.2.1

0 commit comments

Comments
 (0)