-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscraper.py
More file actions
260 lines (209 loc) · 9.08 KB
/
Copy pathscraper.py
File metadata and controls
260 lines (209 loc) · 9.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
"""
Web scraper to extract email addresses and phone numbers from school websites.
"""
import re
import requests
from bs4 import BeautifulSoup
from typing import Dict, Optional, Tuple
from urllib.parse import urljoin, urlparse
import time
def extract_emails(text: str) -> list:
"""
Extract email addresses from text using regex.
Args:
text: Text to search for emails
Returns:
List of found email addresses
"""
# Email regex pattern
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, text)
# Filter out common non-contact emails
exclude_patterns = [
r'.*@example\.com',
r'.*@test\.com',
r'.*@domain\.com',
r'.*noreply.*',
r'.*no-reply.*',
r'.*donotreply.*',
]
filtered_emails = []
for email in emails:
email_lower = email.lower()
if not any(re.match(pattern, email_lower) for pattern in exclude_patterns):
filtered_emails.append(email)
# Remove duplicates while preserving order
seen = set()
unique_emails = []
for email in filtered_emails:
if email.lower() not in seen:
seen.add(email.lower())
unique_emails.append(email)
return unique_emails
def extract_phone_numbers(text: str) -> list:
"""
Extract phone numbers from text using regex.
Supports Dutch phone number formats.
Args:
text: Text to search for phone numbers
Returns:
List of found phone numbers
"""
# Dutch phone number patterns
# Matches: 020-1234567, 020 1234567, 0201234567, +31 20 1234567, etc.
phone_patterns = [
r'\+31\s?[1-9]\d{1,3}\s?[-.\s]?\d{1,4}\s?[-.\s]?\d{4,6}', # International format
r'0[1-9]\d{0,2}[-.\s]?\d{1,4}[-.\s]?\d{4,6}', # National format
r'\(0[1-9]\d{0,2}\)\s?\d{1,4}[-.\s]?\d{4,6}', # With parentheses
]
phone_numbers = []
for pattern in phone_patterns:
matches = re.findall(pattern, text)
phone_numbers.extend(matches)
# Clean and normalize phone numbers
cleaned_numbers = []
for phone in phone_numbers:
# Remove common separators and normalize
cleaned = re.sub(r'[-.\s()]', '', phone)
# Keep only if it's a reasonable length (8-15 digits)
if 8 <= len(re.sub(r'\D', '', cleaned)) <= 15:
cleaned_numbers.append(phone.strip())
# Remove duplicates
seen = set()
unique_numbers = []
for phone in cleaned_numbers:
normalized = re.sub(r'[-.\s()]', '', phone)
if normalized not in seen:
seen.add(normalized)
unique_numbers.append(phone)
return unique_numbers
def scrape_contact_info(website: str, timeout: int = 10) -> Dict[str, list]:
"""
Scrape a website for email addresses and phone numbers.
Args:
website: URL of the website to scrape
timeout: Request timeout in seconds
Returns:
Dictionary with 'emails' and 'phone_numbers' lists
"""
result = {
'emails': [],
'phone_numbers': [],
'error': None
}
try:
print(f" -> Connecting to {website}...", flush=True)
# Set headers to mimic a browser
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Make request
print(f" -> Sending HTTP request...", flush=True)
response = requests.get(website, headers=headers, timeout=timeout, allow_redirects=True)
response.raise_for_status()
print(f" -> Response received (status: {response.status_code})", flush=True)
# Parse HTML
print(f" -> Parsing HTML content...", flush=True)
soup = BeautifulSoup(response.text, 'html.parser')
# Get all text content
text_content = soup.get_text()
print(f" -> Extracted {len(text_content)} characters of text", flush=True)
# Extract emails and phone numbers
print(f" -> Searching for email addresses and phone numbers...", flush=True)
emails = extract_emails(text_content)
phone_numbers = extract_phone_numbers(text_content)
print(f" -> Found {len(emails)} email(s) and {len(phone_numbers)} phone number(s) in main content", flush=True)
# Also check common contact sections (links, specific elements)
print(f" -> Checking mailto and tel links...", flush=True)
# Look for mailto links
mailto_links = soup.find_all('a', href=re.compile(r'^mailto:', re.I))
mailto_count = 0
for link in mailto_links:
email = link.get('href', '').replace('mailto:', '').split('?')[0].strip()
if email and email not in emails:
emails.append(email)
mailto_count += 1
# Look for tel links
tel_links = soup.find_all('a', href=re.compile(r'^tel:', re.I))
tel_count = 0
for link in tel_links:
phone = link.get('href', '').replace('tel:', '').strip()
if phone and phone not in phone_numbers:
phone_numbers.append(phone)
tel_count += 1
if mailto_count > 0 or tel_count > 0:
print(f" -> Found {mailto_count} email(s) and {tel_count} phone(s) in links", flush=True)
# Look in specific elements that often contain contact info
print(f" -> Checking contact/footer sections...", flush=True)
contact_elements = soup.find_all(['div', 'section', 'footer'],
class_=re.compile(r'contact|footer|info', re.I))
contact_section_count = 0
for element in contact_elements:
element_text = element.get_text()
element_emails = extract_emails(element_text)
element_phones = extract_phone_numbers(element_text)
for email in element_emails:
if email not in emails:
emails.append(email)
contact_section_count += 1
for phone in element_phones:
if phone not in phone_numbers:
phone_numbers.append(phone)
contact_section_count += 1
if contact_section_count > 0:
print(f" -> Found {contact_section_count} additional contact(s) in special sections", flush=True)
result['emails'] = emails
result['phone_numbers'] = phone_numbers
print(f" -> Scraping complete: {len(emails)} total email(s), {len(phone_numbers)} total phone(s)", flush=True)
except requests.exceptions.RequestException as e:
result['error'] = f"Request error: {str(e)}"
print(f" -> [ERROR] Request failed: {str(e)}", flush=True)
except Exception as e:
result['error'] = f"Scraping error: {str(e)}"
print(f" -> [ERROR] Scraping failed: {str(e)}", flush=True)
return result
def scrape_school_contact_info(school_data: Dict, delay: float = 1.0) -> Dict:
"""
Scrape contact information for a school and update the school data.
Args:
school_data: Dictionary containing school information including 'website'
delay: Delay between requests in seconds (to be polite)
Returns:
Updated school data dictionary with scraped contact info
"""
website = school_data.get('website', '')
school_name = school_data.get('school_name', 'Unknown')
if not website:
school_data['scraped_email'] = None
school_data['scraped_phone'] = None
school_data['scrape_error'] = "No website URL provided"
print(f" -> [SKIP] No website URL for {school_name}", flush=True)
return school_data
print(f" -> Starting scrape for: {school_name}", flush=True)
# Scrape the website
contact_info = scrape_contact_info(website)
# Update school data with scraped information
school_data['scraped_emails'] = contact_info.get('emails', [])
school_data['scraped_phone_numbers'] = contact_info.get('phone_numbers', [])
school_data['scrape_error'] = contact_info.get('error')
# Set primary email (first one found, or None)
school_data['scraped_email'] = contact_info.get('emails', [None])[0] if contact_info.get('emails') else None
# Set primary phone (first one found, or use CSV phone if available)
school_data['scraped_phone'] = (
contact_info.get('phone_numbers', [None])[0]
if contact_info.get('phone_numbers')
else school_data.get('phone_csv')
)
# Add delay to be polite to servers
if delay > 0:
print(f" -> Waiting {delay} seconds before next request...", flush=True)
time.sleep(delay)
return school_data
if __name__ == "__main__":
# Test the scraper
test_school = {
'school_name': 'Test School',
'website': 'https://www.example.com'
}
result = scrape_school_contact_info(test_school)
print(f"Scraped info: {result}")