-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathbrowser_verify.py
More file actions
379 lines (320 loc) · 15.2 KB
/
browser_verify.py
File metadata and controls
379 lines (320 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
#!/usr/bin/env python3
"""
Use Playwright to visit all sites that need browser verification.
Analyzes rendered page content for auth type, captcha, etc.
"""
import asyncio
import json
import re
import sys
import time
try:
from playwright.async_api import async_playwright, TimeoutError as PWTimeout
except ImportError:
print("Installing playwright...")
import subprocess
subprocess.run([sys.executable, "-m", "pip", "install", "playwright"], check=True)
subprocess.run([sys.executable, "-m", "playwright", "install", "chromium"], check=True)
from playwright.async_api import async_playwright, TimeoutError as PWTimeout
# --- Tunable settings ---
WORKERS = 5 # number of concurrent browser tabs
NAV_TIMEOUT_MS = 10000 # max time for page.goto()
JS_WAIT_MS = 800 # wait for JS rendering after load
JS_WAIT_MS_DEEP = 2500 # longer wait for deep recheck (SPA/JS-heavy)
HARD_LIMIT_S = 15 # hard per-site wall-clock limit (seconds)
# Resource types to block (speeds up loads significantly)
BLOCKED_RESOURCE_TYPES = {"image", "media", "font", "stylesheet"}
async def analyze_page(page, url, deep=False):
"""Analyze a loaded page for auth/captcha/submission info."""
result = {
'auth_type': 'unknown',
'captcha_type': 'none',
'requires_login': False,
'site_status': 'active',
}
try:
html = (await page.content()).lower()
except Exception:
result['site_status'] = 'error'
return result
title = (await page.title()).lower()
# Check if page is dead/404
if any(x in title for x in ['404', 'not found', 'page not found', 'error']):
result['site_status'] = 'not_found'
if any(x in html for x in ['page not found', '404 error', 'this page doesn\'t exist', 'page doesn't exist']):
result['site_status'] = 'not_found'
# Cloudflare challenge
if 'just a moment' in title or 'checking your browser' in html or 'cf-browser-verification' in html:
result['site_status'] = 'cloudflare_blocked'
result['captcha_type'] = 'cloudflare'
return result
# Parked / for-sale domains
if any(x in html for x in ['domain is for sale', 'buy this domain', 'domain may be for sale', 'parked domain', 'this domain is parked']):
result['site_status'] = 'domain_parked'
return result
# --- Auth detection ---
has_google = has_github = has_twitter = has_email_pass = False
has_facebook = has_apple = has_linkedin = False
for pat in ('accounts.google.com', 'googleapis.com/auth', 'google-signin',
'gsi/client', 'sign in with google', 'login with google',
'continue with google', 'google.com/o/oauth', 'google-login',
'auth/google', 'oauth/google', 'btn-google', 'btn_google',
'social-google', 'google oauth', 'google_oauth'):
if pat in html:
has_google = True
break
for pat in ('github.com/login/oauth', 'sign in with github', 'login with github',
'continue with github', 'auth/github', 'oauth/github',
'btn-github', 'btn_github', 'social-github'):
if pat in html:
has_github = True
break
for pat in ('api.twitter.com/oauth', 'sign in with twitter', 'login with twitter',
'continue with twitter', 'auth/twitter', 'sign in with x',
'continue with x', 'login with x', 'btn-twitter', 'social-twitter'):
if pat in html:
has_twitter = True
break
for pat in ('facebook.com/v', 'facebook.com/dialog/oauth', 'connect.facebook.net',
'sign in with facebook', 'login with facebook', 'continue with facebook',
'auth/facebook', 'oauth/facebook', 'btn-facebook', 'btn_facebook',
'social-facebook', 'fb-login', 'fbconnect'):
if pat in html:
has_facebook = True
break
for pat in ('appleid.apple.com/auth', 'sign in with apple', 'continue with apple',
'auth/apple', 'apple-login', 'btn-apple', 'apple-sign-in'):
if pat in html:
has_apple = True
break
for pat in ('linkedin.com/oauth', 'sign in with linkedin', 'login with linkedin',
'continue with linkedin', 'auth/linkedin', 'oauth/linkedin',
'btn-linkedin', 'social-linkedin'):
if pat in html:
has_linkedin = True
break
if re.search(r'<input[^>]*type=["\']password["\']', html):
has_email_pass = True
for pat in ('sign in to continue', 'log in to continue', 'login to submit',
'sign up to submit', 'create an account', 'you must log in',
'please sign in', 'please log in', 'sign in to submit',
'login required', 'sign up to continue'):
if pat in html:
result['requires_login'] = True
break
auths = []
if has_google: auths.append('google')
if has_github: auths.append('github')
if has_twitter: auths.append('twitter')
if has_facebook: auths.append('facebook')
if has_apple: auths.append('apple')
if has_linkedin: auths.append('linkedin')
if has_email_pass: auths.append('email_password')
if not auths:
# Check for <form> tags in HTML
has_form = bool(re.search(r'<form[^>]*>', html))
# Also check for <input> fields outside form wrappers (React/Vue style)
has_inputs = bool(re.search(
r'<input[^>]*type=["\'](?:text|email|url|search|tel)["\']', html))
# Check for contenteditable or textarea
has_textarea = 'textarea' in html or 'contenteditable' in html
# Check for role="form" or data-form attributes (JS frameworks)
has_js_form = bool(re.search(
r'role=["\']form["\']|data-form|ng-form|formik|react-hook-form', html))
if has_form or has_inputs or has_textarea or has_js_form:
result['auth_type'] = 'none'
elif deep:
# --- Deep: use Playwright DOM queries for JS-rendered content ---
try:
dom_info = await page.evaluate('''() => {
const inputs = document.querySelectorAll(
'input[type="text"], input[type="email"], input[type="url"], '
+ 'input[type="password"], input[type="search"], input[type="tel"], '
+ 'input:not([type]), textarea');
const forms = document.querySelectorAll('form, [role="form"]');
const buttons = [...document.querySelectorAll('button, a, [role="button"]')];
const btnTexts = buttons.map(b => b.textContent.toLowerCase().trim())
.filter(t => t.length < 80);
const signupBtns = btnTexts.filter(t =>
/sign.?up|sign.?in|log.?in|register|get started|create account|submit|join/i.test(t));
const oauthBtns = btnTexts.filter(t =>
/google|github|facebook|twitter|apple|linkedin|microsoft|sso/i.test(t));
return {
inputCount: inputs.length,
formCount: forms.length,
signupBtns: signupBtns.slice(0, 10),
oauthBtns: oauthBtns.slice(0, 10),
};
}''')
except Exception:
dom_info = None
if dom_info:
# Re-check OAuth from button text
for btn in dom_info.get('oauthBtns', []):
if 'google' in btn: has_google = True
if 'github' in btn: has_github = True
if 'facebook' in btn: has_facebook = True
if 'twitter' in btn or ' x ' in btn: has_twitter = True
if 'apple' in btn: has_apple = True
if 'linkedin' in btn: has_linkedin = True
# Rebuild auths after DOM check
auths = []
if has_google: auths.append('google')
if has_github: auths.append('github')
if has_twitter: auths.append('twitter')
if has_facebook: auths.append('facebook')
if has_apple: auths.append('apple')
if has_linkedin: auths.append('linkedin')
if auths:
pass # handled below
elif dom_info['formCount'] > 0 or dom_info['inputCount'] > 0:
result['auth_type'] = 'none'
elif dom_info['signupBtns']:
result['auth_type'] = 'none'
result['_submission_hints'] = dom_info['signupBtns']
else:
# Truly no interactive elements found
result['auth_type'] = 'unknown'
else:
result['auth_type'] = 'unknown'
else:
result['auth_type'] = 'unknown'
if auths:
if len(auths) == 1:
result['auth_type'] = auths[0] + '_only' if auths[0] in ('google',) else auths[0]
result['requires_login'] = True
else:
result['auth_type'] = '+'.join(auths)
result['requires_login'] = True
# --- Captcha detection ---
if re.search(r'g-recaptcha|recaptcha/api\.js|grecaptcha', html):
if re.search(r'recaptcha/api\.js\?.*render=|grecaptcha\.execute', html):
result['captcha_type'] = 'recaptcha_v3'
else:
result['captcha_type'] = 'recaptcha_v2'
if re.search(r'hcaptcha\.com|h-captcha', html):
result['captcha_type'] = 'hcaptcha'
if re.search(r'challenges\.cloudflare\.com/turnstile|cf-turnstile', html):
result['captcha_type'] = 'cloudflare_turnstile'
if result['captcha_type'] == 'none' and 'captcha' in html:
result['captcha_type'] = 'captcha_unknown'
return result
async def check_site(context, entry, seq_num, total, data, stats, deep=False):
"""Check a single site using its own page (tab)."""
idx = entry['index']
name = entry['name']
url = entry['url']
tag = f"[{seq_num}/{total}]"
js_wait = JS_WAIT_MS_DEEP if deep else JS_WAIT_MS
page = await context.new_page()
# Block heavy resources to speed things up
await page.route("**/*", lambda route: (
route.abort() if route.request.resource_type in BLOCKED_RESOURCE_TYPES
else route.continue_()
))
try:
t0 = time.monotonic()
async with asyncio.timeout(HARD_LIMIT_S):
await page.goto(url, timeout=NAV_TIMEOUT_MS, wait_until='domcontentloaded')
await page.wait_for_timeout(js_wait)
result = await analyze_page(page, url, deep=deep)
elapsed = time.monotonic() - t0
data[idx]['auth_type'] = result['auth_type']
data[idx]['captcha_type'] = result['captcha_type']
data[idx]['requires_login'] = result['requires_login']
if result['site_status'] != 'active':
data[idx]['site_status'] = result['site_status']
else:
data[idx]['site_status'] = 'active'
if 'analysis_error' in data[idx]:
del data[idx]['analysis_error']
short_status = result['site_status'][:10]
print(f"{tag} {name[:35]:35s} {elapsed:4.1f}s auth={result['auth_type']} cap={result['captcha_type']} st={short_status}")
stats['ok'] += 1
except (PWTimeout, TimeoutError):
elapsed = time.monotonic() - t0
print(f"{tag} {name[:35]:35s} {elapsed:4.1f}s TIMEOUT - skipped")
data[idx]['site_status'] = 'timeout'
stats['timeout'] += 1
except Exception as e:
print(f"{tag} {name[:35]:35s} ERR: {str(e)[:60]}")
data[idx]['site_status'] = 'error'
data[idx]['analysis_error'] = str(e)[:200]
stats['error'] += 1
finally:
try:
await page.close()
except Exception:
pass
async def main():
print("[startup] browser_verify.py starting...", flush=True)
recheck = '--recheck-unknown' in sys.argv or '--deep' in sys.argv
print("[startup] Loading directories.json...", flush=True)
with open('directories.json', 'r') as f:
data = json.load(f)
print(f"[startup] Loaded {len(data)} entries", flush=True)
if recheck:
# Build check list from active unknowns in directories.json
check_list = []
for i, d in enumerate(data):
if d.get('auth_type') == 'unknown' and d.get('site_status') == 'active':
check_list.append({
'index': i,
'name': d['name'],
'url': d.get('submission_url') or d.get('url', ''),
})
mode = 'DEEP recheck'
else:
with open('browser_check_list.json', 'r') as f:
check_list = json.load(f)
mode = 'standard'
total = len(check_list)
print(f"[{mode}] Checking {total} sites with {WORKERS} workers (nav {NAV_TIMEOUT_MS/1000:.0f}s, hard {HARD_LIMIT_S}s)", flush=True)
if total == 0:
print("[startup] Nothing to check — exiting.", flush=True)
return
stats = {'ok': 0, 'timeout': 0, 'error': 0}
semaphore = asyncio.Semaphore(WORKERS)
print("[startup] Launching Playwright chromium...", flush=True)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
print("[startup] Browser launched, creating context...", flush=True)
context = await browser.new_context(
user_agent='Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
viewport={'width': 1280, 'height': 720},
ignore_https_errors=True,
)
print(f"[startup] Context ready, dispatching {total} tasks...", flush=True)
save_lock = asyncio.Lock()
processed_count = 0
async def worker(entry, seq_num):
nonlocal processed_count
async with semaphore:
await check_site(context, entry, seq_num, total, data, stats, deep=recheck)
processed_count += 1
# Intermediate save every 50 sites
if processed_count % 50 == 0:
async with save_lock:
with open('directories.json', 'w') as f:
json.dump(data, f, indent=2)
print(f" [autosave] {processed_count}/{total} processed", flush=True)
tasks = [worker(entry, i + 1) for i, entry in enumerate(check_list)]
await asyncio.gather(*tasks)
print("[cleanup] Closing browser...", flush=True)
await browser.close()
# Save updated JSON
with open('directories.json', 'w') as f:
json.dump(data, f, indent=2)
# Print summary
print(f"\n=== DONE === ok={stats['ok']} timeout={stats['timeout']} error={stats['error']}")
auth_counts = {}
status_counts = {}
for d in data:
a = d.get('auth_type', 'unknown')
auth_counts[a] = auth_counts.get(a, 0) + 1
s = d.get('site_status', 'unknown')
status_counts[s] = status_counts.get(s, 0) + 1
print("\nAuth:", " ".join(f"{k}:{v}" for k, v in sorted(auth_counts.items(), key=lambda x: -x[1])))
print("Status:", " ".join(f"{k}:{v}" for k, v in sorted(status_counts.items(), key=lambda x: -x[1])))
if __name__ == '__main__':
asyncio.run(main())