forked from Udzu/pudzu
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwikipage.py
More file actions
238 lines (195 loc) · 9.83 KB
/
wikipage.py
File metadata and controls
238 lines (195 loc) · 9.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
from math import log
from statistics import mean, median
from urllib.parse import unquote
import bs4
import lxml
import requests
from utils import *
from tureen import *
from dates import *
requests_cache = optional_import("requests_cache")
# Utilities for scraping with Wikipedia and WikiData.
logger = logging.getLogger('wikipage')
class CachedPage():
"""Base class representing a page loaded from an optional requests cache. To enable cacheing, call set_cache on the derived class."""
CACHE = requests
HEADERS = {'User-Agent': 'Mozilla/5.0'}
@classmethod
def set_cache(cls, cache_path, **kwargs):
cls.CACHE = requests if cache_path is None else requests_cache.CachedSession(cache_path, **kwargs)
class WikiPage(CachedPage):
"""Class representing a Wikipedia page."""
@staticmethod
def hostname_from_lang(lang):
return "{}.wikipedia.org".format(lang)
@classmethod
def url_from_title(cls, title, lang="en"):
"""The Wikipedia URL for a given page title."""
return "http://{}/wiki/{}".format(cls.hostname_from_lang(lang), title)
@staticmethod
def title_from_url(url):
"""The page title of a given Wikipedia URL. URL-decoded but underscores are not replaced with spaces."""
t = re.findall("/wiki/(.*)$", url)
return None if len(t) == 0 else unquote(t[0])
def __init__(self, title, lang="en"):
"""The Wikipedia page with a given title."""
self.request_title = title
self.lang = lang
self.hostname = self.hostname_from_lang(lang)
self.url = self.url_from_title(title, lang=lang)
self.response = self.CACHE.get(self.url, headers=self.HEADERS)
self.bs4 = bs4.BeautifulSoup(self.response.content, "lxml")
if self.bs4.find(id="t-permalink") is None:
raise KeyError("Wikipedia article not found for '{}' at {}".format(title, self.hostname))
def __repr__(self):
return "<WikiPage at {}{} [{}]>".format(self.title, "" if self.title == self.request_title else " (redirect from {})".format(self.request_title), self.lang)
@cached_property
def title(self):
"""The page title"""
return self.bs4.find(id="firstHeading").text
@cached_property
@ignoring_exceptions
def entity(self):
"""The page wikidata entity if it exists."""
return self.bs4.find(id="t-wikibase").a['href'].split('/')[-1]
@cached_property
@ignoring_exceptions
def image_url(self):
"""The page og:image URL if it exists."""
return self.bs4.find("meta", {"property": "og:image"})["content"]
# api calls
def pageviews(self, start, end, granularity="monthly", access="all-access", agent="all-agents"):
"""A JSON object representing the number of page views for a given period. See https://wikimedia.org/api/rest_v1/#!/Pageviews_data/get_metrics_pageviews_per_article_project_access_agent_article_granularity_start_end for details"""
def format_date(d):
if isinstance(d, Date): d = d.to_date()
if hasattr(d, 'strftime'): return d.strftime("%Y%m%d")
return d
pageview_api = "https://wikimedia.org/api/rest_v1/metrics/pageviews"
url = "{}/per-article/{}/{}/{}/{}/{}/{}/{}".format(pageview_api, self.hostname, access, agent, self.title, granularity, format_date(start), format_date(end))
return requests.get(url, headers=self.HEADERS).json().get('items', [])
def revision_count(self):
"""The total number of page revisions. May require repeated API calls."""
query_api = "https://{}/w/api.php".format(self.hostname)
parameters = { 'action': 'query', 'titles': self.title, 'prop': 'revisions', 'rvprop': '', 'rvlimit': 'max', 'format': 'json', 'continue': ''}
revisions = 0
while True:
response = requests.get(query_api, params=parameters).json()
for page_id in response['query']['pages']:
revisions += len(response['query']['pages'][page_id]['revisions'])
if 'continue' in response:
parameters['continue'] = response['continue']['continue']
parameters['rvcontinue'] = response['continue']['rvcontinue']
else:
break
return revisions
# the remaining methods are domain-specific
@staticmethod
def wiki_year_title(year, lang="en"):
if lang != "en": raise NotImplementedError
if year > 100: return "{}".format(year)
elif year >= 0: return "AD {}".format(year)
else: return "{} BC".format(-year)
@classmethod
def from_year(cls, year, lang="en"):
"""The Wikipedia page about a given historic year."""
return WikiPage(WikiPage.wiki_year_title(year, lang))
class WDPage(CachedPage):
"""Base class representing a wikidata entity."""
API = "https://www.wikidata.org/w/api.php"
def __init__(self, id, lang="en"):
"""A Wikidata entry with the given id."""
self.id = id
self.json = self.get_entity(self.id)
self.lang = lang
self.claims = self.json['entities'][id]['claims']
@classmethod
def from_name(cls, name, lang="en", property=False):
"""A Wikidata entry with the given name."""
id = cls.search_entity(name, property, lang)
if id is None:
raise KeyError("No entity found for {} ({})".format(name, lang))
return cls(id, lang)
@classmethod
def from_wikipedia(cls, title, lang="en"):
"""A Wikidata entry for the given Wikipedia page"""
wp = title if isinstance(title, WikiPage) else WikiPage(title, lang=lang)
return cls(wp.entity, lang)
# API calls
@classmethod
def api_call(cls, parameters):
"""Wikidata api call."""
logurl = "{}?{}".format(cls.API, "&".join("{}={}".format(k,v) for k,v in parameters.items()))
logger.info("WikiData API: {}".format(logurl))
json = cls.CACHE.get(cls.API, params=assoc_in(parameters, ['format'], 'json'), headers=cls.HEADERS).json()
if 'error' in json:
raise Exception("WikiData API error for {}: {}".format(logurl, json['error'].get('info', '(no info)')))
return json
@classmethod
def get_entity(cls, id):
"""Return claims and labels for a given entity"""
parameters = { 'action' : 'wbgetentities', 'ids': id, 'props': 'claims|labels', 'format': 'json' }
return cls.api_call(parameters)
@classmethod
def search_entity(cls, name, property=False, lang="en", precise=True):
"""Returns the entity codes for a given search."""
type = 'property' if property else 'item'
parameters = { 'action': 'wbsearchentities', 'search': name, 'language': lang, 'type': type, 'format': 'json' }
results = cls.api_call(parameters).get('search',[])
if precise: results = [ d for d in results if d['match']['text'].lower() == name.lower() ]
ids = [ d['id'] for d in results ]
return first_or_none(ids) if precise else ids
# methods
def __repr__(self):
return "<WDPage at {} ({})>".format(self.id, self.name())
def __eq__(self, other):
if isinstance(other, WDPage):
return self.id == other.id
else:
return NotImplemented
def name(self, lang=None):
"""Wikidata entity name"""
return self.json['entities'][self.id]['labels'].get(lang or self.lang, {'value': '(unknown)'})['value']
@staticmethod
def convert_value(value):
"""Convert wikidata value to an appropriate Python type. Currently handles just string, entity and time, and even those not perfectly."""
if 'time' in value:
year = int(value['time'][0:5])
month = int(value['time'][6:8])
day = int(value['time'][9:11])
date = tuple(x for x in (year, month, day) if x != 0)
precision = DatePrecision(11 - int(value['precision']))
return ApproximateDate(date, precision)
elif 'id' in value:
return WDPage(value['id'])
else:
return value
def property_values(self, property, qualifier_filter=lambda qs:True, convert=True):
"""Basic property value extractor. For more complex queries, use self.claims directly for now."""
id = property.id if isinstance(property, WDPage) else property
convert = self.convert_value if convert else identity
return [convert(claim['mainsnak']['datavalue']['value'])
for claim in self.claims.get(id, [])
if claim['mainsnak']['snaktype'] == 'value'
and qualifier_filter(claim.get('qualifiers', {}))]
# the remaining methods are domain-specific
COUNTRY = 'P17'
PLACE_OF_BIRTH = 'P19'
DATE_OF_BIRTH = 'P569'
END_TIME = 'P582'
@cached_property
def dates_of_birth(self):
"""Dates of birth."""
return sorted(self.property_values(self.DATE_OF_BIRTH))
@cached_property
def places_of_birth(self):
"""Places of birth."""
return sorted(self.property_values(self.PLACE_OF_BIRTH))
@cached_property
def countries_of_birth(self):
"""Countries of birth (based on modern borders)."""
cobs = []
for pob in self.places_of_birth:
for cob in pob.property_values(self.COUNTRY, lambda qs: self.END_TIME not in qs):
cobs.append(cob)
return list(remove_duplicates(cobs, lambda wd: wd.id))
WDProp = partial(WDPage.from_name, property=True)