-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathotp_email.py
More file actions
222 lines (184 loc) · 7.68 KB
/
Copy pathotp_email.py
File metadata and controls
222 lines (184 loc) · 7.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
#!/usr/bin/env python3
"""
Automatic OTP retrieval from email via IMAP.
Connects to your email inbox, finds the latest LinkedIn verification email,
extracts the 6-digit code, and optionally cleans up the email afterward.
Required env vars:
IMAP_HOST - IMAP server (e.g. imap.gmail.com, imap.hostinger.com)
IMAP_PORT - IMAP port (default: 993)
IMAP_USER - Email address to log in with
IMAP_PASS - Email password or app-specific password
Optional env var:
OTP_CLEANUP - Set to "true" to delete verification emails after reading
"""
import imaplib
import email
import os
import re
import time
from email.header import decode_header
def _decode_header(s):
if s is None:
return ""
decoded = decode_header(s)
parts = []
for part, charset in decoded:
if isinstance(part, bytes):
parts.append(part.decode(charset or "utf-8", errors="replace"))
else:
parts.append(part)
return " ".join(parts)
def _get_body(msg):
"""Extract plain text body from email message."""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
cd = str(part.get("Content-Disposition", ""))
if ct == "text/plain" and "attachment" not in cd:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace")
# Fallback to HTML with tags stripped
for part in msg.walk():
ct = part.get_content_type()
cd = str(part.get("Content-Disposition", ""))
if ct == "text/html" and "attachment" not in cd:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
html = payload.decode(charset, errors="replace")
text = re.sub(r'<[^>]+>', ' ', html)
text = re.sub(r'\s+', ' ', text).strip()
return text
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace")
return ""
def _extract_code(body):
"""Extract a 6-digit verification code from email body."""
if not body:
return None
# Look for standalone 6-digit numbers (the OTP pattern)
matches = re.findall(r'\b(\d{6})\b', body)
return matches[0] if matches else None
def fetch_otp(max_wait=90, poll_interval=5, max_age_seconds=300):
"""
Poll the email inbox for a LinkedIn verification code.
Args:
max_wait: Maximum seconds to wait for the email (default: 90)
poll_interval: Seconds between inbox checks (default: 5)
max_age_seconds: Ignore emails older than this many seconds (default: 300)
Returns:
The 6-digit code as a string, or None if not found within max_wait.
"""
imap_host = os.environ.get("IMAP_HOST", "")
imap_port = int(os.environ.get("IMAP_PORT", "993"))
imap_user = os.environ.get("IMAP_USER", "")
imap_pass = os.environ.get("IMAP_PASS", "")
if not all([imap_host, imap_user, imap_pass]):
print("[OTP-EMAIL] IMAP credentials not configured. Set IMAP_HOST, IMAP_USER, IMAP_PASS in .env", flush=True)
return None
print(f"[OTP-EMAIL] Waiting for LinkedIn verification email (up to {max_wait}s)...", flush=True)
deadline = time.time() + max_wait
attempt = 0
while time.time() < deadline:
attempt += 1
mail = None
try:
mail = imaplib.IMAP4_SSL(imap_host, imap_port)
mail.login(imap_user, imap_pass)
mail.select("INBOX", readonly=False)
# Search for LinkedIn security emails
status, messages = mail.search(None, '(FROM "security-noreply@linkedin.com" UNSEEN)')
if status != "OK" or not messages[0]:
print(f"[OTP-EMAIL] Attempt {attempt}: no unread LinkedIn emails yet...", flush=True)
mail.logout()
time.sleep(poll_interval)
continue
uids = messages[0].split()
# Check newest first
for uid in reversed(uids):
status, data = mail.fetch(uid, "(RFC822)")
if status != "OK" or not data[0]:
continue
msg = email.message_from_bytes(data[0][1])
subject = _decode_header(msg["Subject"]).lower()
# Only process verification/code emails
if not any(kw in subject for kw in [
"verification", "security code", "verify",
"here's your", "pin", "code"
]):
continue
body = _get_body(msg)
code = _extract_code(body)
if code:
print(f"[OTP-EMAIL] Found verification code in email: '{_decode_header(msg['Subject'])}'", flush=True)
# Optionally delete the email
cleanup = os.environ.get("OTP_CLEANUP", "").lower() == "true"
if cleanup:
mail.store(uid, "+FLAGS", "\\Deleted")
mail.expunge()
print("[OTP-EMAIL] Deleted verification email.", flush=True)
mail.logout()
return code
print(f"[OTP-EMAIL] Attempt {attempt}: LinkedIn emails found but no code extracted...", flush=True)
mail.logout()
except imaplib.IMAP4.error as e:
print(f"[OTP-EMAIL] IMAP error: {e}", flush=True)
if mail:
try:
mail.logout()
except Exception:
pass
except Exception as e:
print(f"[OTP-EMAIL] Error: {e}", flush=True)
if mail:
try:
mail.logout()
except Exception:
pass
time.sleep(poll_interval)
print(f"[OTP-EMAIL] Timed out after {max_wait}s. No verification code found.", flush=True)
return None
def cleanup_linkedin_emails():
"""Delete all LinkedIn verification and device registration emails."""
imap_host = os.environ.get("IMAP_HOST", "")
imap_port = int(os.environ.get("IMAP_PORT", "993"))
imap_user = os.environ.get("IMAP_USER", "")
imap_pass = os.environ.get("IMAP_PASS", "")
if not all([imap_host, imap_user, imap_pass]):
return
mail = None
try:
mail = imaplib.IMAP4_SSL(imap_host, imap_port)
mail.login(imap_user, imap_pass)
mail.select("INBOX")
status, messages = mail.search(None, '(FROM "security-noreply@linkedin.com")')
if status != "OK" or not messages[0]:
mail.logout()
return
count = 0
for uid in messages[0].split():
status, data = mail.fetch(uid, "(RFC822)")
if status != "OK" or not data[0]:
continue
msg = email.message_from_bytes(data[0][1])
subject = _decode_header(msg["Subject"]).lower()
if any(kw in subject for kw in [
"verification", "security code", "verify",
"new device", "remember me", "device registration"
]):
mail.store(uid, "+FLAGS", "\\Deleted")
count += 1
if count:
mail.expunge()
print(f"[OTP-EMAIL] Cleaned up {count} LinkedIn verification email(s).", flush=True)
mail.logout()
except Exception as e:
print(f"[OTP-EMAIL] Cleanup error: {e}", flush=True)
if mail:
try:
mail.logout()
except Exception:
pass