-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathunshortener.py
More file actions
86 lines (76 loc) · 2.73 KB
/
unshortener.py
File metadata and controls
86 lines (76 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import os
import json
import multiprocessing
import requests
import time
import tqdm
import signal
import sys
from bs4 import BeautifulSoup
resolver_url = 'https://unshorten.me/'
class Unshortener(object):
def __init__(self, mappings={}):
self.session = requests.Session()
res_text = self.session.get(resolver_url).text
soup = BeautifulSoup(res_text, 'html.parser')
csrf = soup.select('input[name="csrfmiddlewaretoken"]')[0]['value']
#print(csrf)
self.csrf = csrf
self.mappings = mappings
def unshorten(self, url, handle_error=True):
if url not in self.mappings:
res_text = self.session.post(resolver_url, headers={'Referer': resolver_url}, data={'csrfmiddlewaretoken': self.csrf, 'url': url}).text
soup = BeautifulSoup(res_text, 'html.parser')
try:
source_url = soup.select('section[id="features"] h3 code')[0].get_text()
except:
print('ERROR for', url)
if handle_error:
source_url = url
else:
source_url = None
m = (url, source_url)
#print(m)
self.mappings[m[0]] = m[1]
else:
source_url = self.mappings[url]
return source_url
def func(params):
url, uns = params
res = uns.unshorten(url)
#print(res)
return (url, res)
def unshorten_multiprocess(url_list, mappings={}, pool_size=4):
# one unshortener for each process
unshorteners = [Unshortener(mappings) for _ in range(pool_size)]
args = [(url, unshorteners[idx % pool_size]) for (idx,url) in enumerate(url_list)]
with multiprocessing.Pool(pool_size) as pool:
# one-to-one with the url_list
specific_results = {}
for result in tqdm.tqdm(pool.imap_unordered(func, args), total=len(args)):
url, resolved = result
mappings[url] = resolved
specific_results[url] = resolved
return specific_results
mappings_file = 'data/mappings.json'
mappings = {}
def signal_handler(sig, frame):
print('You pressed Ctrl+C!')
with open(mappings_file, 'w') as f:
mappings = json.dump(mappings, f, indent=2)
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
with open('data/aggregated_urls.json') as f:
data = json.load(f)
urls = data.keys()
if os.path.isfile(mappings_file):
with open(mappings_file) as f:
mappings = json.load(f)
print('already mappings', len(mappings))
try:
unshorten_multiprocess(urls, mappings)
except Exception as e:
print('gotcha')
with open(mappings_file, 'w') as f:
mappings = json.dump(mappings, f, indent=2)