Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding reports #3409

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
111 changes: 111 additions & 0 deletions reports/get_all_extensions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
Script to collect metadata on all published extensions. Requires an access token
from https://open-vsx.org/user-settings/tokens. Used by Jupyter notebooks.
"""
import requests
import os
import json
import time
from datetime import datetime

API_ENDPOINT = "https://open-vsx.org/"

ACCESS_TOKEN = os.getenv('ACCESS_TOKEN')
JSON_FILENAME = 'extensions.json'
TSV_FILENAME = 'extensions.tsv'

url = API_ENDPOINT + 'api/-/search?size=100&token=%s' % ACCESS_TOKEN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The search endpoint is public. You don't need a token.


def get_all_extensions():
extensions = []
done = False
offset = 0
while not done:
search_url = url + '&offset=%s' % offset
try:
response = requests.get(search_url)
results = response.json()
extensions = extensions + results['extensions']
offset = len(extensions)
print('Retrieved %s extensions' % len(extensions))
if len(extensions) == results['totalSize']:
done = True
except Exception as e:
print("%s: %s" % (datetime.now(), e))
done = True

count = 1
all_extensions = []
print("\n\nStarting: %s" % datetime.now())
for extension in extensions:
namespace_url = API_ENDPOINT + 'api/%s/%s?token=%s' % (extension['namespace'], extension['name'], ACCESS_TOKEN)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extension endpoint is public. You don't need a token.

retry_count = 5
while retry_count > 0:
try:
response = requests.get(namespace_url)
if response.status_code == 200:
break
else:
raise Exception('%s: HTTP %s Error retrieving %s' % (datetime.now(), response.status_code,extension['url']))
except Exception as e:
print("%s: %s" % (datetime.now(), e))
retry_count -= 1
time.sleep(2)
if retry_count == 0:
print('Error retrieving %s' % extension['url'])
else:
results = response.json()
all_extensions.append(results)
if int(count/100) == count/100:
print('Processed %s of %s.' % (count, len(extensions)))
count += 1
print("\n\nFinished %s API Calls: %s" % (count, datetime.now()))

return(all_extensions)

def get_all_by_license():
extensions_by_license = {}
all_extensions = get_all_extensions()
count = 1
for extension in all_extensions:
license = extension.get('license', 'None')
if license in extensions_by_license:
extensions_by_license[license].append(extension)
else:
extensions_by_license[license] = [extension]
if int(count/100) == count/100:
print('Processed %s of %s.' % (count, len(all_extensions)))
count += 1

return(dict(sorted(extensions_by_license.items())))

def write_json_file(extensions):
f = open(JSON_FILENAME, 'w')
f.write(json.dumps(extensions, indent=4))
f.close()
return

def write_tsv_file(extensions):
f = open(TSV_FILENAME, 'w')
columns = "Name\tNamespace\tVersions\tLogin Name\t"
columns = columns + "Full Name\tLicense\tTimestamp\tDownloads\tReviews\tFiles\t"
columns = columns + "PreRelease\tVerified\tUnrelated Publisher\tNamespace Access\tPreview\t"
columns = columns + "Homepage\tRepo\tBugs\tBundled Extensions\n"
f.write(columns)
for e in extensions:
row = "%s\t%s\t%s\t%s" % (e['name'], e['namespace'], len(e['allVersions']), e['publishedBy']['loginName'])
row = "%s\t%s\t%s\t%s\t%s\t%s\t%s" % (row, e['publishedBy'].get('fullName', 'None'), e.get('license', 'None'), e['timestamp'], e['downloadCount'], e['reviewCount'], len(e['files']))
row = "%s\t%s\t%s\t%s\t%s\t%s" % (row, e['preRelease'], e['verified'], e['unrelatedPublisher'], e['namespaceAccess'], e['preview'])
row = "%s\t%s\t%s\t%s\t%s\n" % (row, e.get('homepage', 'None'), e.get('repository', 'None'), e.get('bugs', 'None'), len(e['dependencies']))
f.write(row)
f.close()

if __name__ == '__main__':
extensions = get_all_extensions()
write_json_file(extensions)
write_tsv_file(extensions)





131 changes: 131 additions & 0 deletions reports/get_availability_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""
Script to collect availability data from open-vsx endpoints monitored by
betteruptime. Used by graph_availability_trends Jupyter Notebook.Requires
an access token from IT team.
"""
import requests
from datetime import datetime, timedelta
import numpy as np
import os
import calendar
import time

API_URL = 'https://betteruptime.com/api/v2'
TOKEN = os.getenv('TOKEN')
HEADERS = {'Authorization': 'Bearer %s' % TOKEN}

def make_api_call(url):
# print("Calling %s" % url)
retry_count = 5
done = False
while not done:
try:
response = requests.get(url, headers=HEADERS)
if response.status_code != 200:
raise Exception('%s HTTP error %s' % (url, response.status_code))
else:
done = True
except Exception as e:
print(" %s, retrying..." % e)
if retry_count > 0:
time.sleep((6 - retry_count) * 5)
retry_count = retry_count - 1
else:
done = True
raise Exception('Failing call to %s after multiple retries' % url)

return response.json()

def get_all_monitors():
all_openvsx_monitors = []
all_monitors_url = '%s/monitors' % API_URL
done = False
while not done:
json_results = make_api_call(all_monitors_url)
for monitor in json_results['data']:
if 'https://open-vsx.org' in monitor['attributes']['url']:
all_openvsx_monitors.append(monitor)
next_page = json_results['pagination'].get('next')
if next_page is None:
done = True
else:
all_monitors_url = next_page
return all_openvsx_monitors

def get_monitor_data(monitor, time_span):
id = monitor['id']
name = monitor['attributes']['pronounceable_name']
date_str = monitor['attributes']['created_at']
start_date = datetime.strptime(date_str[0:10], '%Y-%m-%d')
end_date = start_date + timedelta(days=time_span)
today = datetime.now()
dates = []
sla_data = []
downtime_data = []
print('processing %s' % name)
while end_date <= today:
availability_url = '%s/monitors/%s/sla?from=%s&to=%s' % (API_URL, id, start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'))
json_results = make_api_call(availability_url)
dates.append(np.datetime64(end_date.strftime('%Y-%m-%d')))
sla_data.append(json_results['data']['attributes']['availability'])
downtime_url = '%s/monitors/%s/sla?from=%s&to=%s' % (API_URL, id, start_date.strftime('%Y-%m-%d'), start_date.strftime('%Y-%m-%d'))
json_results = make_api_call(downtime_url)
downtime_data.append(json_results['data']['attributes']['total_downtime']/60)
start_date = start_date + timedelta(days=1)
end_date = end_date + timedelta(days=1)
print('finished processing')
return name, dates, sla_data, downtime_data

def get_continuous_data(time_span=30):
monitors = get_all_monitors()
results = []
for monitor in monitors:
name, dates, sla_data, downtime_data = get_monitor_data(monitor, time_span=30)
results.append({'name': name,
'dates': dates,
'sla_data': sla_data,
'downtime_data': downtime_data})
return results

def get_monthly_monitor_data(monitor):
id = monitor['id']
name = monitor['attributes']['pronounceable_name']
date_str = monitor['attributes']['created_at']
interval_start_date = datetime.strptime(date_str[0:10], '%Y-%m-%d')
end_date = datetime.now()
dates = []
sla_data = []
downtime_data = []
print('processing %s' % name)
while interval_start_date < end_date:
interval_days_in_month = calendar.monthrange(interval_start_date.year, interval_start_date.month)[1]
interval_end_date = interval_start_date + timedelta(days=interval_days_in_month - interval_start_date.day)
availability_url = '%s/monitors/%s/sla?from=%s&to=%s' % (API_URL, id, interval_start_date.strftime('%Y-%m-%d'), interval_end_date.strftime('%Y-%m-%d'))
json_results = make_api_call(availability_url)
dt = interval_start_date.strftime('%Y-%m')
dates.append(np.datetime64(dt))
sla_data.append(json_results['data']['attributes']['availability'])
downtime_url = '%s/monitors/%s/sla?from=%s&to=%s' % (API_URL, id, interval_start_date.strftime('%Y-%m-%d'), interval_end_date.strftime('%Y-%m-%d'))
json_results = make_api_call(downtime_url)
downtime_data.append(json_results['data']['attributes']['total_downtime']/60)
interval_start_date = interval_end_date + timedelta(days=1)

print('finished processing')
return name, dates, sla_data, downtime_data

def get_monthly_data():
monitors = get_all_monitors()
results = []
for monitor in monitors:
name, dates, sla_data, downtime_data = get_monthly_monitor_data(monitor)
results.append({'name': name,
'dates': dates,
'sla_data': sla_data,
'downtime_data': downtime_data})
return results


if __name__ == '__main__':
results = get_monthly_data()
print(results)

Loading