Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion mftp/env.example.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
## If hoster wish to share their powers with others as well
## Sort of emergency contact?
## That is why it is a list of emails
HOSTER_EMAIL = ["[email protected]", "[email protected]"]
HOSTER_EMAIL = ["[email protected]", "[email protected]"]
HOSTER_NAME = "Arpit Bhardwaj"
HOSTER_ROLL = ROLL_NUMBER
HOSTER_INTERESTED_ROLLS = ["XXYYXXXXX", "NNAANNNNN"]
Expand Down Expand Up @@ -75,3 +75,8 @@
NTFY_PASS = "fakepassword"
## Optional (specific to metakgp [naarad] architecture): Heimdall security token
HEIMDALL_COOKIE = '17ab96bd8ffbe8ca58a78657a918558'


DOCTOR_TOPIC_URL = "https://ntfy.sh/metakgp-mftp" # NTFY topic to send doctors' notifications on
TOPIC_URL = "https://ntfy.sh/metakgp-mftp" # NTFY topic to which mftp sends notifications (set the same topic as in mftp config)
EMAIL = "[email protected]" # Email to send doctors' notifications on
183 changes: 136 additions & 47 deletions mftp/mftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,81 @@
from datetime import datetime
import iitkgp_erp_login.erp as erp

import logging
import re

# mtfp doctor
def check_error(logs):
error_words = ["error", "failed"]
resp = None

if any(re.search(word, logs, re.I) for word in error_words):
logging.info(" ERROR(s) DETECTED!")

try:
resp = send_notification(logs)
except Exception as e:
logging.error(f" FAILED TO SEND NOTIFICATION : {str(e)}")
finally:
logging.info(f" NOTIFICATION STATUS : {resp}")
else:
logging.info(" NO ERROR(s) DETECTED!")

def send_notification(logs, topic_url=env.DOCTOR_TOPIC_URL):
query_params = f"message={logs}"
request_url = f"{topic_url}?{query_params}"

headers = {
"Priority": "5",
"Tags": "warning,mftp,error",
"Title": "MFTP encountered an error",
"Markdown": "yes"
}

if env.EMAIL:
headers["Email"] = env.EMAIL

response = requests.put(request_url, headers=headers)
return response.status_code

def check_downtime(timestamp):
downtime_threshold = 30

if timestamp == "NULL":
logging.info(" LAST SENT TIME IS NULL, SKIPPING DOWNTIME CHECK")
return

try:
last_time = datetime.strptime(timestamp, '%H:%M:%S %d-%m-%Y')
diff = (datetime.now() - last_time).total_seconds() / 60

if diff > downtime_threshold:
logging.info(f" DOWNTIME DETECTED (Last log was {diff:.2f} minutes ago)")

try:
body = f"\nDOWNTIME DETECTED.\n\nPlease check the CDC Noticeboard from your ERP account until MFTP is back online.\n"
body += '''
--------------

⚠️ DISCLAIMER ⚠️

MFTP is unofficial. Not affiliated with CDC, ERP, or Placement Committee. Do not rely solely on MFTP for updates. MFTP-related issues cannot be used as arguments with official authorities.

--------------
'''
resp = send_notification(body, env.TOPIC_URL)
except Exception as e:
logging.error(f" FAILED TO SEND NOTIFICATION : {str(e)}")
resp = None
finally:
logging.info(f" NOTIFICATION STATUS : {resp}")
else:
logging.info(" NO DOWNTIME DETECTED")
except Exception as e:
logging.error(f" FAILED TO CHECK DOWNTIME : {str(e)}")

# mftp

headers = {
"timeout": "20",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/51.0.2704.79 Chrome/51.0.2704.79 Safari/537.36",
Expand Down Expand Up @@ -44,64 +119,78 @@
)
args = parser.parse_args()

logging.basicConfig(level=logging.INFO)

while True:
now = datetime.now()
print(
f"================ <<: {now.strftime('%H:%M:%S %d-%m-%Y')} :>> ================",
flush=True,
)

print("[ERP LOGIN]", flush=True)
_, ssoToken = erp.login(
headers,
session,
ERPCREDS=env,
OTP_CHECK_INTERVAL=2,
LOGGING=True,
SESSION_STORAGE_FILE=".session",
)
runtime_logs = []

if env.COMPANY_NOTIFIER:
if args.gmail_api or args.smtp:
_, new, modified = company.fetch(session, headers, ssoToken)

filtered = []
if new + modified:
filtered = company.filter(new + modified, "OPEN_N")
if filtered:
latest_ssoToken = session.cookies.get("ssoToken")
companies_mail = mail.format_companies(
latest_ssoToken, filtered
)
mail.send_companies(companies_mail, args.gmail_api, args.smtp)
else:
print("[NO NEW COMPANIES]")
try:
print("[ERP LOGIN]", flush=True)
_, ssoToken = erp.login(
headers,
session,
ERPCREDS=env,
OTP_CHECK_INTERVAL=2,
LOGGING=True,
SESSION_STORAGE_FILE=".session",
)

notice_db = db.NoticeDB(
config={"uri": env.MONGO_URI, "db_name": env.MONGO_DATABASE},
collection_name=env.MONGO_COLLECTION,
)
notice_db.connect()

notices = notice.fetch(headers, session, ssoToken, notice_db)
if notices:
if args.ntfy:
notifications = ntfy.format_notices(notices)
if notifications:
ntfy.send_notices(notifications, notice_db)
if env.COMPANY_NOTIFIER:
if args.gmail_api or args.smtp:
_, new, modified = company.fetch(session, headers, ssoToken)

filtered = []
if new + modified:
filtered = company.filter(new + modified, "OPEN_N")
if filtered:
latest_ssoToken = session.cookies.get("ssoToken")
companies_mail = mail.format_companies(
latest_ssoToken, filtered
)
mail.send_companies(companies_mail, args.gmail_api, args.smtp)
else:
print("[NO NEW COMPANIES]")

notice_db = db.NoticeDB(
config={"uri": env.MONGO_URI, "db_name": env.MONGO_DATABASE},
collection_name=env.MONGO_COLLECTION,
)
notice_db.connect()

notices = notice.fetch(headers, session, ssoToken, notice_db)
if notices:
if args.ntfy:
notifications = ntfy.format_notices(notices)
if notifications:
ntfy.send_notices(notifications, notice_db)
else:
if env.SHORTLIST_NOTIFIER:
shortlists = shortlist.search(notices)
if shortlists:
shortlists_mails = mail.format_shortlists(shortlists)
if shortlists_mails:
mail.send_shortlists(shortlists_mails, args.gmail_api, args.ntfy)
mails = mail.format_notices(notices)
if mails:
mail.send_notices(mails, args.smtp, args.gmail_api, notice_db)
else:
if env.SHORTLIST_NOTIFIER:
shortlists = shortlist.search(notices)
if shortlists:
shortlists_mails = mail.format_shortlists(shortlists)
if shortlists_mails:
mail.send_shortlists(shortlists_mails, args.gmail_api, args.ntfy)
mails = mail.format_notices(notices)
if mails:
mail.send_notices(mails, args.smtp, args.gmail_api, notice_db)
else:
print("[NO NEW NOTICES]", flush=True)
print("[NO NEW NOTICES]", flush=True)

timestamp = now.strftime('%H:%M:%S %d-%m-%Y')

except Exception as e:
timestamp = now.strftime('%H:%M:%S %d-%m-%Y')
runtime_logs.append(str(e))
logging.error(runtime_logs)

check_downtime(timestamp)
check_error("\n".join(runtime_logs))
if args.cron:
break

Expand Down