-
Notifications
You must be signed in to change notification settings - Fork 108
/
Copy pathpsi.py
69 lines (53 loc) · 1.83 KB
/
psi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/local/bin/python3
# -*- coding: utf-8 -*-
'''PageSpeed Insights Single + Google Cloud Functions'''
import os
import base64
from urllib import parse
import requests
from google.cloud import storage
from google.cloud.storage import Blob
# Access Token, generated from GCP Console Credentials page.
API_KEY = os.getenv('GCP_API_KEY')
GAPI_PSI = "https://www.googleapis.com/pagespeedonline/v5/runPagespeed"
SESSION = requests.Session()
PROXIES = None
def save(url, report):
'''Save to https://console.cloud.google.com/storage/browser/[bucket-id]/'''
client = storage.Client()
bucket = client.get_bucket("psi-report")
blob = Blob(f"${parse.quote_plus(url)}.json", bucket)
blob.upload_from_string(report, "application/json")
def run(url):
try:
payload = {"url": url,
"category": "performance",
"locale": "zh-CN",
"strategy": "mobile",
"key": API_KEY
}
response = SESSION.get(url=GAPI_PSI, params=payload, proxies=PROXIES)
print(response.status_code)
if 200 == response.status_code:
save(url, response.text)
except requests.RequestException as _e:
print(_e)
return 'OK'
def run_pubsub(event, context):
pubsub_message = base64.urlsafe_b64decode(event['data']).decode('utf-8')
return run(pubsub_message)
def test_run_http(test_url):
run(test_url)
def test_run_pubsub(test_url):
event = {"data": base64.urlsafe_b64encode(test_url.encode('utf-8'))}
context = None
run_pubsub(event, context)
if __name__ == "__main__":
_proxy = os.getenv("HTTP_PROXY")
PROXIES = {
"http": _proxy,
"https": _proxy,
}
_test_url = "https://m.ctrip.com/webapp/flight/schedule/detail.html"
test_run_http(_test_url)
test_run_pubsub(_test_url)