diff --git a/api/dis_plots.py b/api/dis_plots.py index ae7fade..258b5fb 100644 --- a/api/dis_plots.py +++ b/api/dis_plots.py @@ -1,6 +1,6 @@ -''' dis_plots.py - Plot functions for the DIS UI -''' +"""dis_plots.py +Plot functions for the DIS UI +""" from math import pi import pandas as pd @@ -13,86 +13,101 @@ SOURCE3_PALETTE = ["mediumblue", "darkorange", "wheat"] TYPE_PALETTE = ["mediumblue", "darkorange", "wheat", "darkgray"] + # ****************************************************************************** # * Utility functions * # ****************************************************************************** def _preprint_type_piechart(coll, year): - ''' Create a preprint type pie chart - Keyword arguments: - coll: dois collection - year: year or "All" - Returns: - Chart components - ''' + """Create a preprint type pie chart + Keyword arguments: + coll: dois collection + year: year or "All" + Returns: + Chart components + """ match = {"type": "posted-content"} - if year != 'All': - match['jrc_publishing_date'] = {"$regex": "^"+ year} - payload = [{"$match": match}, - {"$group": {"_id": {"institution": "$institution"},"count": {"$sum": 1}}}] + if year != "All": + match["jrc_publishing_date"] = {"$regex": "^" + year} + payload = [ + {"$match": match}, + {"$group": {"_id": {"institution": "$institution"}, "count": {"$sum": 1}}}, + ] try: rows = coll.aggregate(payload) except Exception as err: raise err data = {} for row in rows: - if not row['_id']['institution']: - data['No institution'] = row['count'] + if not row["_id"]["institution"]: + data["No institution"] = row["count"] else: - data[row['_id']['institution'][0]['name']] = row['count'] + data[row["_id"]["institution"][0]["name"]] = row["count"] if not data: return None, None title = "Preprint DOI institutions" - if year != 'All': + if year != "All": title += f" ({year})" - return pie_chart(dict(sorted(data.items())), title, - "source", width=600, height=400, location='bottom_right') + return pie_chart( + dict(sorted(data.items())), + title, + "source", + width=600, + height=400, + location="bottom_right", + ) def _preprint_capture_piechart(coll, year): - ''' Create a preprint capture pie chart - Keyword arguments: - coll: dois collection - year: year or "All" - Returns: - Chart components - ''' + """Create a preprint capture pie chart + Keyword arguments: + coll: dois collection + year: year or "All" + Returns: + Chart components + """ data = {} - payload = {"subtype": "preprint", "jrc_preprint": {"$exists": 1}, - "relation.is-preprint-of": {"$exists": 0}} - if year != 'All': - payload['jrc_publishing_date'] = {"$regex": "^"+ year} + payload = { + "subtype": "preprint", + "jrc_preprint": {"$exists": 1}, + "relation.is-preprint-of": {"$exists": 0}, + } + if year != "All": + payload["jrc_publishing_date"] = {"$regex": "^" + year} try: - data['Fuzzy matching'] = coll.count_documents(payload) + data["Fuzzy matching"] = coll.count_documents(payload) except Exception as err: raise err - del payload['relation.is-preprint-of'] + del payload["relation.is-preprint-of"] try: - data['Crossref relation'] = coll.count_documents(payload) + data["Crossref relation"] = coll.count_documents(payload) except Exception as err: raise err - data['Crossref relation'] = data['Crossref relation'] - data['Fuzzy matching'] - if not data['Crossref relation'] and not data['Fuzzy matching']: + data["Crossref relation"] = data["Crossref relation"] - data["Fuzzy matching"] + if not data["Crossref relation"] and not data["Fuzzy matching"]: return None, None title = "Preprint capture method" - if year != 'All': + if year != "All": title += f" ({year})" - return pie_chart(data, title, "source", colors=SOURCE_PALETTE, width=600, height=400) + return pie_chart( + data, title, "source", colors=SOURCE_PALETTE, width=600, height=400 + ) def preprint_pie_charts(data, year, coll): - ''' Create a preprint capture pie chart - Keyword arguments: - data: dictionary of data - year: year or "All" - coll: dois collection - Returns: - Chart components - ''' + """Create a preprint capture pie chart + Keyword arguments: + data: dictionary of data + year: year or "All" + coll: dois collection + Returns: + Chart components + """ title = "DOIs by preprint status" - if year != 'All': + if year != "All": title += f" ({year})" - chartscript, chartdiv = pie_chart(data, title, "source", - colors=SOURCE_PALETTE, width=600, height=400) + chartscript, chartdiv = pie_chart( + data, title, "source", colors=SOURCE_PALETTE, width=600, height=400 + ) # Preprint types try: script2, div2 = _preprint_type_piechart(coll, year) @@ -111,41 +126,60 @@ def preprint_pie_charts(data, year, coll): raise err return chartscript, chartdiv + # ****************************************************************************** # * Basic charts * # ****************************************************************************** -def pie_chart(data, title, legend, height=300, width=400, location="right", colors=None): - ''' Create a pie chart - Keyword arguments: - data: dictionary of data - title: chart title - legend: data key name - height: height of the chart (optional) - width: width of the chart (optional) - colors: list of colors (optional) - Returns: - Figure components - ''' + +def pie_chart( + data, title, legend, height=300, width=400, location="right", colors=None +): + """Create a pie chart + Keyword arguments: + data: dictionary of data + title: chart title + legend: data key name + height: height of the chart (optional) + width: width of the chart (optional) + colors: list of colors (optional) + Returns: + Figure components + """ if len(data) == 1: colors = ["mediumblue"] elif len(data) == 2: colors = SOURCE_PALETTE if not colors: - colors = all_palettes['Category10'][len(data)] + colors = all_palettes["Category10"][len(data)] elif isinstance(colors, str): print(colors) colors = all_palettes[colors][len(data)] - pdata = pd.Series(data).reset_index(name='value').rename(columns={'index': legend}) - pdata['angle'] = pdata['value']/pdata['value'].sum() * 2*pi - pdata['percentage'] = pdata['value']/pdata['value'].sum()*100 - pdata['color'] = colors + pdata = pd.Series(data).reset_index(name="value").rename(columns={"index": legend}) + pdata["angle"] = pdata["value"] / pdata["value"].sum() * 2 * pi + pdata["percentage"] = pdata["value"] / pdata["value"].sum() * 100 + pdata["color"] = colors tooltips = f"@{legend}: @value (@percentage%)" - plt = figure(title=title, toolbar_location=None, height=height, width=width, - tools="hover", tooltips=tooltips, x_range=(-0.5, 1.0)) - plt.wedge(x=0, y=1, radius=0.4, - start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), - line_color="white", fill_color='color', legend_field=legend, source=pdata) + plt = figure( + title=title, + toolbar_location=None, + height=height, + width=width, + tools="hover", + tooltips=tooltips, + x_range=(-0.5, 1.0), + ) + plt.wedge( + x=0, + y=1, + radius=0.4, + start_angle=cumsum("angle", include_zero=True), + end_angle=cumsum("angle"), + line_color="white", + fill_color="color", + legend_field=legend, + source=pdata, + ) plt.axis.axis_label = None plt.axis.visible = False plt.grid.grid_line_color = None @@ -154,33 +188,36 @@ def pie_chart(data, title, legend, height=300, width=400, location="right", colo def stacked_bar_chart(data, title, xaxis, yaxis, colors=None, width=None, height=None): - ''' Create a stacked bar chart - Keyword arguments: - data: dictionary of data - title: chart title - xaxis: x-axis column name - yaxis: list of y-axis column names - colors: list of colors (optional) - width: width of chart (optional) - height: height of chart (optional) - Returns: - Figure components - ''' + """Create a stacked bar chart + Keyword arguments: + data: dictionary of data + title: chart title + xaxis: x-axis column name + yaxis: list of y-axis column names + colors: list of colors (optional) + width: width of chart (optional) + height: height of chart (optional) + Returns: + Figure components + """ if not colors: colors = plasma(len(yaxis)) - plt = figure(x_range=data[xaxis], title=title, - toolbar_location=None, tools="hover", - tooltips=f"$name @{xaxis}: @$name") + plt = figure( + x_range=data[xaxis], + title=title, + toolbar_location=None, + tools="hover", + tooltips=f"$name @{xaxis}: @$name", + ) if width and height: plt.width = width plt.height = height - plt.vbar_stack(yaxis, x=xaxis, width=0.9, - color=colors, source=data, - legend_label=yaxis - ) - plt.legend.location = 'top_left' + plt.vbar_stack( + yaxis, x=xaxis, width=0.9, color=colors, source=data, legend_label=yaxis + ) + plt.legend.location = "top_left" if width and height: - plt.add_layout(plt.legend[0], 'right') + plt.add_layout(plt.legend[0], "right") plt.xgrid.grid_line_color = None plt.y_range.start = 0 plt.background_fill_color = "ghostwhite" diff --git a/api/dis_responder.py b/api/dis_responder.py index 03a2fb9..992915d 100644 --- a/api/dis_responder.py +++ b/api/dis_responder.py @@ -1,6 +1,6 @@ -''' dis_responder.py - UI and REST API for Data and Information Services -''' +"""dis_responder.py +UI and REST API for Data and Information Services +""" from datetime import date, datetime, timedelta from html import escape @@ -16,7 +16,7 @@ from time import time from bokeh.palettes import all_palettes, plasma import bson -from flask import (Flask, make_response, render_template, request, jsonify, send_file) +from flask import Flask, make_response, render_template, request, jsonify, send_file from flask_cors import CORS from flask_swagger import swagger import requests @@ -30,51 +30,60 @@ # Database DB = {} # Custom queries -CUSTOM_REGEX = {"publishing_year": {"field": "jrc_publishing_date", - "value": "^!REPLACE!"} - } +CUSTOM_REGEX = { + "publishing_year": {"field": "jrc_publishing_date", "value": "^!REPLACE!"} +} # Navigation -NAV = {"Home": "", - "DOIs": {"DOIs by insertion date": "dois_insertpicker", - "DOIs awaiting processing": "dois_pending", - "DOIs by publisher": "dois_publisher", - "DOIs by source": "dois_source", - "DOIs by year": "dois_year", - "DOIs by month": "dois_month", - "DOI yearly report": "dois_report" - }, - "Authorship": {"DOIs by authorship": "dois_author", - "DOIs with lab head first/last authors": "doiui_group"}, - "Preprints": {"DOIs by preprint status": "dois_preprint", - "DOIs by preprint status by year": "dois_preprint_year"}, - "Journals": {"Top journals": "dois_journal"}, - "ORCID": {"Groups": "groups", - "Entries": "orcid_entry", - "Duplicates": "orcid_duplicates", - }, - "Tag/affiliation": {"DOIs by tag": "dois_tag", - "Top DOI tags by year": "dois_top", - "Author affiliations": "orcid_tag", - }, - "Stats" : {"Database": "stats_database" - }, - "External systems": {"Search People system": "people", - "Supervisory Organizations": "orgs", - } - } +NAV = { + "Home": "", + "DOIs": { + "DOIs by insertion date": "dois_insertpicker", + "DOIs awaiting processing": "dois_pending", + "DOIs by publisher": "dois_publisher", + "DOIs by source": "dois_source", + "DOIs by year": "dois_year", + "DOIs by month": "dois_month", + "DOI yearly report": "dois_report", + }, + "Authorship": { + "DOIs by authorship": "dois_author", + "DOIs with lab head first/last authors": "doiui_group", + }, + "Preprints": { + "DOIs by preprint status": "dois_preprint", + "DOIs by preprint status by year": "dois_preprint_year", + }, + "Journals": {"Top journals": "dois_journal"}, + "ORCID": { + "Groups": "groups", + "Entries": "orcid_entry", + "Duplicates": "orcid_duplicates", + }, + "Tag/affiliation": { + "DOIs by tag": "dois_tag", + "Top DOI tags by year": "dois_top", + "Author affiliations": "orcid_tag", + }, + "Stats": {"Database": "stats_database"}, + "External systems": { + "Search People system": "people", + "Supervisory Organizations": "orgs", + }, +} # Sources # Dates -OPSTART = datetime.strptime('2024-05-16','%Y-%m-%d') +OPSTART = datetime.strptime("2024-05-16", "%Y-%m-%d") # ****************************************************************************** # * Classes * # ****************************************************************************** + class CustomJSONEncoder(JSONEncoder): - ''' Define a custom JSON encoder - ''' + """Define a custom JSON encoder""" + def default(self, o): try: if isinstance(o, bson.objectid.ObjectId): @@ -96,8 +105,8 @@ def default(self, o): class InvalidUsage(Exception): - ''' Class to populate error return for JSON. - ''' + """Class to populate error return for JSON.""" + def __init__(self, message, status_code=400, payload=None): Exception.__init__(self) self.message = message @@ -105,21 +114,22 @@ def __init__(self, message, status_code=400, payload=None): self.payload = payload def to_dict(self): - ''' Build error response - ''' + """Build error response""" retval = dict(self.payload or ()) - retval['rest'] = {'status_code': self.status_code, - 'error': True, - 'error_text': f"{self.message}\n" \ - + f"An exception of type {type(self).__name__} occurred. " \ - + f"Arguments:\n{self.args}"} + retval["rest"] = { + "status_code": self.status_code, + "error": True, + "error_text": f"{self.message}\n" + + f"An exception of type {type(self).__name__} occurred. " + + f"Arguments:\n{self.args}", + } return retval class CustomException(Exception): - ''' Class to populate error return for HTML. - ''' - def __init__(self,message, preface=""): + """Class to populate error return for HTML.""" + + def __init__(self, message, preface=""): super().__init__(message) self.original = type(message).__name__ self.args = message.args @@ -141,22 +151,30 @@ def __init__(self,message, preface=""): @app.before_request def before_request(): - ''' Set transaction start time and increment counters. - If needed, initilize global variables. - ''' + """Set transaction start time and increment counters. + If needed, initilize global variables. + """ if not DB: try: dbconfig = JRC.get_config("databases") except Exception as err: - return render_template('warning.html', urlroot=request.url_root, - title=render_warning("Config error"), message=err) + return render_template( + "warning.html", + urlroot=request.url_root, + title=render_warning("Config error"), + message=err, + ) dbo = attrgetter("dis.prod.write")(dbconfig) print(f"Connecting to {dbo.name} prod on {dbo.host} as {dbo.user}") try: - DB['dis'] = JRC.connect_database(dbo) + DB["dis"] = JRC.connect_database(dbo) except Exception as err: - return render_template('warning.html', urlroot=request.url_root, - title=render_warning("Database connect error"), message=err) + return render_template( + "warning.html", + urlroot=request.url_root, + title=render_warning("Database connect error"), + message=err, + ) app.config["START_TIME"] = time() app.config["COUNTER"] += 1 endpoint = request.endpoint if request.endpoint else "(Unknown)" @@ -166,28 +184,30 @@ def before_request(): return generate_response(result) return None + # ****************************************************************************** # * Error utility functions * # ****************************************************************************** + @app.errorhandler(InvalidUsage) def handle_invalid_usage(error): - ''' Error handler - Keyword arguments: - error: error object - ''' + """Error handler + Keyword arguments: + error: error object + """ response = jsonify(error.to_dict()) response.status_code = error.status_code return response def error_message(err): - ''' Create an error message from an exception - Keyword arguments: - err: exception - Returns: - Error message - ''' + """Create an error message from an exception + Keyword arguments: + err: exception + Returns: + Error message + """ if isinstance(err, CustomException): msg = f"{err.preface}\n" if err.preface else "" msg += f"An exception of type {err.original} occurred. Arguments:\n{err.args}" @@ -197,99 +217,120 @@ def error_message(err): def inspect_error(err, errtype): - ''' Render an error with inspection - Keyword arguments: - err: exception - Returns: - Error screen - ''' - mess = f"In {inspect.stack()[1][3]}, An exception of type {type(err).__name__} occurred. " \ - + f"Arguments:\n{err.args}" - return render_template('error.html', urlroot=request.url_root, - title=render_warning(errtype), message=mess) - - -def render_warning(msg, severity='error', size='lg'): - ''' Render warning HTML - Keyword arguments: - msg: message - severity: severity (warning, error, info, or success) - size: glyph size - Returns: - HTML rendered warning - ''' - icon = 'exclamation-triangle' - color = 'goldenrod' - if severity == 'error': - color = 'red' - elif severity == 'success': - icon = 'check-circle' - color = 'lime' - elif severity == 'info': - icon = 'circle-info' - color = 'blue' - elif severity == 'na': - icon = 'minus-circle' - color = 'gray' - elif severity == 'missing': - icon = 'minus-circle' - elif severity == 'no': - icon = 'times-circle' - color = 'red' - elif severity == 'warning': - icon = 'exclamation-circle' - return f"" \ - + f" {msg}" + """Render an error with inspection + Keyword arguments: + err: exception + Returns: + Error screen + """ + mess = ( + f"In {inspect.stack()[1][3]}, An exception of type {type(err).__name__} occurred. " + + f"Arguments:\n{err.args}" + ) + return render_template( + "error.html", + urlroot=request.url_root, + title=render_warning(errtype), + message=mess, + ) + + +def render_warning(msg, severity="error", size="lg"): + """Render warning HTML + Keyword arguments: + msg: message + severity: severity (warning, error, info, or success) + size: glyph size + Returns: + HTML rendered warning + """ + icon = "exclamation-triangle" + color = "goldenrod" + if severity == "error": + color = "red" + elif severity == "success": + icon = "check-circle" + color = "lime" + elif severity == "info": + icon = "circle-info" + color = "blue" + elif severity == "na": + icon = "minus-circle" + color = "gray" + elif severity == "missing": + icon = "minus-circle" + elif severity == "no": + icon = "times-circle" + color = "red" + elif severity == "warning": + icon = "exclamation-circle" + return ( + f"" + + f" {msg}" + ) + # ****************************************************************************** # * Navigation utility functions * # ****************************************************************************** + def generate_navbar(active): - ''' Generate the web navigation bar - Keyword arguments: - Navigation bar - ''' - nav = ''' + """Generate the web navigation bar + Keyword arguments: + Navigation bar + """ + nav = """ ' + nav += "" return nav + # ****************************************************************************** # * Payload utility functions * # ****************************************************************************** + def receive_payload(): - ''' Get a request payload (form or JSON). - Keyword arguments: - None - Returns: - payload dictionary - ''' + """Get a request payload (form or JSON). + Keyword arguments: + None + Returns: + payload dictionary + """ pay = {} if not request.get_data(): return pay @@ -305,232 +346,274 @@ def receive_payload(): def initialize_result(): - ''' Initialize the result dictionary - Returns: - decoded partially populated result dictionary - ''' - result = {"rest": {"requester": request.remote_addr, - "url": request.url, - "endpoint": request.endpoint, - "error": False, - "elapsed_time": "", - "row_count": 0, - "pid": os.getpid()}} + """Initialize the result dictionary + Returns: + decoded partially populated result dictionary + """ + result = { + "rest": { + "requester": request.remote_addr, + "url": request.url, + "endpoint": request.endpoint, + "error": False, + "elapsed_time": "", + "row_count": 0, + "pid": os.getpid(), + } + } if app.config["LAST_TRANSACTION"]: - print(f"Seconds since last transaction: {time() - app.config['LAST_TRANSACTION']}") + print( + f"Seconds since last transaction: {time() - app.config['LAST_TRANSACTION']}" + ) app.config["LAST_TRANSACTION"] = time() return result def generate_response(result): - ''' Generate a response to a request - Keyword arguments: - result: result dictionary - Returns: - JSON response - ''' - result["rest"]["elapsed_time"] = str(timedelta(seconds=time() - app.config["START_TIME"])) + """Generate a response to a request + Keyword arguments: + result: result dictionary + Returns: + JSON response + """ + result["rest"]["elapsed_time"] = str( + timedelta(seconds=time() - app.config["START_TIME"]) + ) return jsonify(**result) def get_custom_payload(ipd, display_value): - ''' Get custom payload - Keyword arguments: - ipd: input payload dictionary - display_value: display value - Returns: - payload: payload for MongoDB find - ptitle: page title - ''' - if ipd['field'] in CUSTOM_REGEX: - rex = CUSTOM_REGEX[ipd['field']]['value'] - ipd['value'] = {"$regex": rex.replace("!REPLACE!", ipd['value'])} - ipd['field'] = CUSTOM_REGEX[ipd['field']]['field'] + """Get custom payload + Keyword arguments: + ipd: input payload dictionary + display_value: display value + Returns: + payload: payload for MongoDB find + ptitle: page title + """ + if ipd["field"] in CUSTOM_REGEX: + rex = CUSTOM_REGEX[ipd["field"]]["value"] + ipd["value"] = {"$regex": rex.replace("!REPLACE!", ipd["value"])} + ipd["field"] = CUSTOM_REGEX[ipd["field"]]["field"] ptitle = f"DOIs for {ipd['field']} {display_value}" - payload = {ipd['field']: ipd['value']} - if 'jrc_obtained_from' in ipd and ipd['jrc_obtained_from']: - payload['jrc_obtained_from'] = ipd['jrc_obtained_from'] + payload = {ipd["field"]: ipd["value"]} + if "jrc_obtained_from" in ipd and ipd["jrc_obtained_from"]: + payload["jrc_obtained_from"] = ipd["jrc_obtained_from"] ptitle += f" from {ipd['jrc_obtained_from']}" return payload, ptitle + # ****************************************************************************** # * ORCID utility functions * # ****************************************************************************** + def get_work_publication_date(wsumm): - ''' Get a publication date from an ORCID work summary - Keyword arguments: - wsumm: ORCID work summary - Returns: - Publication date - ''' - pdate = '' - if 'publication-date' in wsumm and wsumm['publication-date']: - ppd = wsumm['publication-date'] - if 'year' in ppd and ppd['year']['value']: - pdate = ppd['year']['value'] - if 'month' in ppd and ppd['month'] and ppd['month']['value']: + """Get a publication date from an ORCID work summary + Keyword arguments: + wsumm: ORCID work summary + Returns: + Publication date + """ + pdate = "" + if "publication-date" in wsumm and wsumm["publication-date"]: + ppd = wsumm["publication-date"] + if "year" in ppd and ppd["year"]["value"]: + pdate = ppd["year"]["value"] + if "month" in ppd and ppd["month"] and ppd["month"]["value"]: pdate += f"-{ppd['month']['value']}" - if 'day' in ppd and ppd['day'] and ppd['day']['value']: + if "day" in ppd and ppd["day"] and ppd["day"]["value"]: pdate += f"-{ppd['day']['value']}" return pdate def get_work_doi(work): - ''' Get a DOI from an ORCID work - Keyword arguments: - work: ORCID work - Returns: - DOI - ''' - if not work['external-ids']['external-id']: - return '' - for eid in work['external-ids']['external-id']: - if eid['external-id-type'] != 'doi': + """Get a DOI from an ORCID work + Keyword arguments: + work: ORCID work + Returns: + DOI + """ + if not work["external-ids"]["external-id"]: + return "" + for eid in work["external-ids"]["external-id"]: + if eid["external-id-type"] != "doi": continue - if 'external-id-normalized' in eid: - return eid['external-id-normalized']['value'] - if 'external-id-value' in eid: - return eid['external-id-url']['value'] - return '' + if "external-id-normalized" in eid: + return eid["external-id-normalized"]["value"] + if "external-id-value" in eid: + return eid["external-id-url"]["value"] + return "" def orcid_payload(oid, orc, eid=None): - ''' Generate a payload for searching the dois collection by ORCID or employeeId - Keyword arguments: - oid: ORCID or employeeId - orc: orcid record - eid: employeeId boolean - Returns: - Payload - ''' + """Generate a payload for searching the dois collection by ORCID or employeeId + Keyword arguments: + oid: ORCID or employeeId + orc: orcid record + eid: employeeId boolean + Returns: + Payload + """ # Name only search - payload = {"$and": [{"$or": [{"author.given": {"$in": orc['given']}}, - {"creators.givenName": {"$in": orc['given']}}]}, - {"$or": [{"author.family": {"$in": orc['family']}}, - {"creators.familyName": {"$in": orc['family']}}]}] - } + payload = { + "$and": [ + { + "$or": [ + {"author.given": {"$in": orc["given"]}}, + {"creators.givenName": {"$in": orc["given"]}}, + ] + }, + { + "$or": [ + {"author.family": {"$in": orc["family"]}}, + {"creators.familyName": {"$in": orc["family"]}}, + ] + }, + ] + } if eid and not oid: # Employee ID only search payload = {"$or": [{"jrc_author": eid}, {"$and": payload["$and"]}]} elif oid and eid: # Search by either name or employee ID - payload = {"$or": [{"orcid": oid}, {"jrc_author": eid}, {"$and": payload["$and"]}]} + payload = { + "$or": [{"orcid": oid}, {"jrc_author": eid}, {"$and": payload["$and"]}] + } return payload def get_dois_for_orcid(oid, orc, use_eid, both): - ''' Generate DOIs for a single user - Keyword arguments: - oid: ORCID or employeeId - orc: orcid record - use_eid: use employeeId boolean - both: search by both ORCID and employeeId - Returns: - HTML and a list of DOIs - ''' + """Generate DOIs for a single user + Keyword arguments: + oid: ORCID or employeeId + orc: orcid record + use_eid: use employeeId boolean + both: search by both ORCID and employeeId + Returns: + HTML and a list of DOIs + """ try: if use_eid: payload = {"jrc_author": oid} elif both: - eid = orc['employeeId'] if 'employeeId' in orc else None + eid = orc["employeeId"] if "employeeId" in orc else None payload = orcid_payload(oid, orc, eid) else: payload = orcid_payload(oid, orc) - rows = DB['dis'].dois.find(payload) + rows = DB["dis"].dois.find(payload) except Exception as err: - raise CustomException(err, "Could not find in dois collection by name.") from err + raise CustomException( + err, "Could not find in dois collection by name." + ) from err return rows def generate_works_table(rows, name=None): - ''' Generate table HTML for a person's works - Keyword arguments: - rows: rows from dois collection - name: search key [optional] - Returns: - HTML and a list of DOIs - ''' + """Generate table HTML for a person's works + Keyword arguments: + rows: rows from dois collection + name: search key [optional] + Returns: + HTML and a list of DOIs + """ works = [] dois = [] authors = {} html = "" fileoutput = "" for row in rows: - doi = doi_link(row['doi']) if row['doi'] else " " - if 'title' in row and isinstance(row['title'], str): - title = row['title'] + doi = doi_link(row["doi"]) if row["doi"] else " " + if "title" in row and isinstance(row["title"], str): + title = row["title"] else: title = DL.get_title(row) - dois.append(row['doi']) - payload = {"date": DL.get_publishing_date(row), - "doi": doi, - "title": title - } + dois.append(row["doi"]) + payload = {"date": DL.get_publishing_date(row), "doi": doi, "title": title} works.append(payload) fileoutput += f"{payload['date']}\t{row['doi']}\t{payload['title']}\n" if name: alist = DL.get_author_details(row) if alist: for auth in alist: - if "family" in auth and "given" in auth \ - and auth["family"].lower() == name.lower(): + if ( + "family" in auth + and "given" in auth + and auth["family"].lower() == name.lower() + ): authors[f"{auth['given']} {auth['family']}"] = True else: print(f"Could not get author details for {row['doi']}") if not works: return html, [] - html += "" \ - + '' - for work in sorted(works, key=lambda row: row['date'], reverse=True): - html += f"" \ - + f"" + html += ( + "
PublishedDOITitle
{work['date']}{work['doi'] if work['doi'] else ' '}{work['title']}
" + + "" + ) + for work in sorted(works, key=lambda row: row["date"], reverse=True): + html += ( + f"" + + f"" + ) if dois: html += "
PublishedDOITitle
{work['date']}{work['doi'] if work['doi'] else ' '}{work['title']}
" if authors: - html = f"
Authors found: {', '.join(sorted(authors.keys()))}
" \ - + f"This may include non-Janelia authors
{html}" - html = create_downloadable('works', ['Published', 'DOI', 'Title'], fileoutput) + html + html = ( + f"
Authors found: {', '.join(sorted(authors.keys()))}
" + + f"This may include non-Janelia authors
{html}" + ) + html = ( + create_downloadable("works", ["Published", "DOI", "Title"], fileoutput) + html + ) html = f"DOIs: {len(works)}
" + html return html, dois def get_orcid_from_db(oid, use_eid=False, both=False, bare=False): - ''' Generate HTML for an ORCID or employeeId that is in the orcid collection - Keyword arguments: - oid: ORCID or employeeId - use_eid: use employeeId boolean - both: search by both ORCID and employeeId - bare: entry has no ORCID or employeeId - Returns: - HTML and a list of DOIs - ''' + """Generate HTML for an ORCID or employeeId that is in the orcid collection + Keyword arguments: + oid: ORCID or employeeId + use_eid: use employeeId boolean + both: search by both ORCID and employeeId + bare: entry has no ORCID or employeeId + Returns: + HTML and a list of DOIs + """ try: if bare: - orc = DB['dis'].orcid.find_one({"_id": bson.ObjectId(oid)}) + orc = DB["dis"].orcid.find_one({"_id": bson.ObjectId(oid)}) else: - payload = {'userIdO365' if use_eid else 'orcid': oid} - orc = DB['dis'].orcid.find_one(payload) + payload = {"userIdO365" if use_eid else "orcid": oid} + orc = DB["dis"].orcid.find_one(payload) except Exception as err: - raise CustomException(err, "Could not find_one in orcid collection by ORCID ID.") from err + raise CustomException( + err, "Could not find_one in orcid collection by ORCID ID." + ) from err if not orc: return "", [] html = "
" - if use_eid and 'orcid' in orc: - html += f"" + if use_eid and "orcid" in orc: + html += ( + f"" + ) html += f"" html += f"" - if 'userIdO365' in orc: - link = "{orc['userIdO365']}" + if "userIdO365" in orc: + link = ( + "{orc['userIdO365']}" + ) html += f"" - if 'affiliations' in orc: - html += f"" + if "affiliations" in orc: + html += ( + f"" + ) html += "
ORCID:" \ - + f"{orc['orcid']}
ORCID:" + + f"{orc['orcid']}
Given name:{', '.join(sorted(orc['given']))}
Family name:{', '.join(sorted(orc['family']))}
User ID:{link}
Affiliations:{', '.join(orc['affiliations'])}
Affiliations:{', '.join(orc['affiliations'])}

" try: if use_eid: - oid = orc['employeeId'] + oid = orc["employeeId"] rows = get_dois_for_orcid(oid, orc, use_eid, both) except Exception as err: raise err @@ -543,88 +626,106 @@ def get_orcid_from_db(oid, use_eid=False, both=False, bare=False): def add_orcid_works(data, dois): - ''' Generate HTML for a list of works from ORCID - Keyword arguments: - data: ORCID data - dois: list of DOIs from dois collection - Returns: - HTML for a list of works from ORCID - ''' + """Generate HTML for a list of works from ORCID + Keyword arguments: + data: ORCID data + dois: list of DOIs from dois collection + Returns: + HTML for a list of works from ORCID + """ html = inner = "" works = 0 - for work in data['activities-summary']['works']['group']: - wsumm = work['work-summary'][0] + for work in data["activities-summary"]["works"]["group"]: + wsumm = work["work-summary"][0] pdate = get_work_publication_date(wsumm) doi = get_work_doi(work) if (not doi) or (doi in dois): continue works += 1 if not doi: - inner += f"{pdate} " \ - + f"{wsumm['title']['title']['value']}" + inner += ( + f"{pdate} " + + f"{wsumm['title']['title']['value']}" + ) continue link = "" - if work['external-ids']['external-id'][0]['external-id-url']: - if work['external-ids']['external-id'][0]['external-id-url']: - link = "{doi}" + if work["external-ids"]["external-id"][0]["external-id-url"]: + if work["external-ids"]["external-id"][0]["external-id-url"]: + link = ( + "{doi}" + ) else: link = doi_link(doi) - inner += f"{pdate}{link}" \ - + f"{wsumm['title']['title']['value']}" + inner += ( + f"{pdate}{link}" + + f"{wsumm['title']['title']['value']}" + ) if inner: title = "title is" if works == 1 else f"{works} titles are" - html += f"
The additional {title} from ORCID. Note that titles below may " \ - + "be self-reported, may not have DOIs available, or may be from the author's " \ - + "employment outside of Janelia.
" - html += '' \ - + '' \ - + f"{inner}
PublishedDOITitle
" + html += ( + f"
The additional {title} from ORCID. Note that titles below may " + + "be self-reported, may not have DOIs available, or may be from the author's " + + "employment outside of Janelia.
" + ) + html += ( + '' + + "" + + f"{inner}
PublishedDOITitle
" + ) return html def generate_user_table(rows): - ''' Generate HTML for a list of users - Keyword arguments: - rows: rows from orcid collection - Returns: - HTML for a list of authors with a count - ''' + """Generate HTML for a list of users + Keyword arguments: + rows: rows from orcid collection + Returns: + HTML for a list of authors with a count + """ count = 0 - html = '' \ - + '' \ - + '' + html = ( + '
ORCIDGiven nameFamily nameStatus
' + + "" + + "" + ) for row in rows: count += 1 - if 'orcid' in row: + if "orcid" in row: link = f"{row['orcid']}" - elif 'userIdO365' in row: + elif "userIdO365" in row: link = f"No ORCID found" else: link = f"No ORCID found" - auth = DL.get_single_author_details(row, DB['dis'].orcid) + auth = DL.get_single_author_details(row, DB["dis"].orcid) badges = get_badges(auth) - rclass = 'other' if (auth and auth['alumni']) else 'active' - html += f"" \ - + f"" - html += '
ORCIDGiven nameFamily nameStatus
{link}{', '.join(row['given'])}{', '.join(row['family'])}{' '.join(badges)}
' - cbutton = "" + rclass = "other" if (auth and auth["alumni"]) else "active" + html += ( + f"{link}{', '.join(row['given'])}" + + f"{', '.join(row['family'])}{' '.join(badges)}" + ) + html += "" + cbutton = ( + '" + ) html = cbutton + html return html, count + # ****************************************************************************** # * DOI utility functions * # ****************************************************************************** + def doi_link(doi): - ''' Return a link to a DOI or DOIs - Keyword arguments: - doi: DOI - Returns: - newdoi: HTML link(s) to DOI(s) as a string - ''' + """Return a link to a DOI or DOIs + Keyword arguments: + doi: DOI + Returns: + newdoi: HTML link(s) to DOI(s) as a string + """ if not doi: return "" doilist = [doi] if isinstance(doi, str) else doi @@ -639,40 +740,40 @@ def doi_link(doi): def get_doi(doi): - ''' Get a single DOI record - Keyword arguments: - doi: DOI - Returns: - source: data source - data: data from response - ''' + """Get a single DOI record + Keyword arguments: + doi: DOI + Returns: + source: data source + data: data from response + """ if DL.is_datacite(doi): resp = JRC.call_datacite(doi) - source = 'datacite' - data = resp['data']['attributes'] if 'data' in resp else {} + source = "datacite" + data = resp["data"]["attributes"] if "data" in resp else {} else: resp = JRC.call_crossref(doi) - source = 'crossref' - data = resp['message'] if 'message' in resp else {} + source = "crossref" + data = resp["message"] if "message" in resp else {} return source, data def add_jrc_fields(row): - ''' Add a table of custom JRC fields - Keyword arguments: - row: DOI record - Returns: - HTML - ''' + """Add a table of custom JRC fields + Keyword arguments: + row: DOI record + Returns: + HTML + """ jrc = {} prog = re.compile("^jrc_") for key, val in row.items(): - if not re.match(prog, key) or key in app.config['DO_NOT_DISPLAY']: + if not re.match(prog, key) or key in app.config["DO_NOT_DISPLAY"]: continue - if isinstance(val, list) and key not in ('jrc_preprint'): + if isinstance(val, list) and key not in ("jrc_preprint"): try: if isinstance(val[0], dict): - val = ", ".join(sorted(elem['name'] for elem in val)) + val = ", ".join(sorted(elem["name"] for elem in val)) else: val = ", ".join(sorted(val)) except TypeError: @@ -683,14 +784,14 @@ def add_jrc_fields(row): html = '' for key in sorted(jrc): val = jrc[key] - if key == 'jrc_author': + if key == "jrc_author": link = [] for auth in val.split(", "): link.append(f"{auth}") val = ", ".join(link) - if key == 'jrc_preprint': + if key == "jrc_preprint": val = doi_link(val) - elif 'jrc_tag' in key: + elif "jrc_tag" in key: link = [] for aff in val.split(", "): link.append(f"{aff}") @@ -701,280 +802,341 @@ def add_jrc_fields(row): def add_relations(row): - ''' Create a list of relations - Keyword arguments: - row: DOI record - Returns: - HTML - ''' + """Create a list of relations + Keyword arguments: + row: DOI record + Returns: + HTML + """ html = "" - if "relation" in row and row['relation']: + if "relation" in row and row["relation"]: # Crossref relations - for rel in row['relation']: + for rel in row["relation"]: used = [] - for itm in row['relation'][rel]: - if itm['id'] in used: + for itm in row["relation"][rel]: + if itm["id"] in used: continue html += f"This DOI {rel.replace('-', ' ')} {doi_link(itm['id'])}
" - used.append(itm['id']) - elif 'relatedIdentifiers' in row and row['relatedIdentifiers']: + used.append(itm["id"]) + elif "relatedIdentifiers" in row and row["relatedIdentifiers"]: # DataCite relations - for rel in row['relatedIdentifiers']: - if 'relatedIdentifierType' in rel and rel['relatedIdentifierType'] == 'DOI': - words = re.split('(?<=.)(?=[A-Z])', rel['relationType']) - html += f"This DOI {' '.join(wrd.lower() for wrd in words)} " \ - + f"{doi_link(rel['relatedIdentifier'])}
" + for rel in row["relatedIdentifiers"]: + if "relatedIdentifierType" in rel and rel["relatedIdentifierType"] == "DOI": + words = re.split("(?<=.)(?=[A-Z])", rel["relationType"]) + html += ( + f"This DOI {' '.join(wrd.lower() for wrd in words)} " + + f"{doi_link(rel['relatedIdentifier'])}
" + ) return html def get_migration_data(row): - ''' Create a migration record for a single DOI - Keyword arguments: - doi: doi record - orgs: dictionary of organizations/codes - Returns: - migration dictionary - ''' + """Create a migration record for a single DOI + Keyword arguments: + doi: doi record + orgs: dictionary of organizations/codes + Returns: + migration dictionary + """ rec = {} # Author tags = [] - if 'jrc_tag' in row and row['jrc_tag']: - if isinstance(row['jrc_tag'][0], dict): - for atag in row['jrc_tag']: + if "jrc_tag" in row and row["jrc_tag"]: + if isinstance(row["jrc_tag"][0], dict): + for atag in row["jrc_tag"]: tags.append(atag) - #else: + # else: # #TAG Old style - can delete after cutover # for atag in row['jrc_tag']: # code = orgs[atag] if atag in orgs else None # tags.append({"name": atag, "code": code}) - if 'jrc_author' in row: - rec['jrc_author'] = row['jrc_author'] + if "jrc_author" in row: + rec["jrc_author"] = row["jrc_author"] if tags: - rec['tags'] = tags + rec["tags"] = tags # Additional data - if row['jrc_obtained_from'] == 'Crossref' and 'abstract' in row: - rec['abstract'] = row['abstract'] - rec['journal'] = DL.get_journal(row) - if 'jrc_publishing_date' in row: - rec['jrc_publishing_date'] = row['jrc_publishing_date'] - if 'publisher' in row: - rec['publisher'] = row['publisher'] - rec['title'] = DL.get_title(row) - if 'URL' in row: - rec['url'] = row['URL'] + if row["jrc_obtained_from"] == "Crossref" and "abstract" in row: + rec["abstract"] = row["abstract"] + rec["journal"] = DL.get_journal(row) + if "jrc_publishing_date" in row: + rec["jrc_publishing_date"] = row["jrc_publishing_date"] + if "publisher" in row: + rec["publisher"] = row["publisher"] + rec["title"] = DL.get_title(row) + if "URL" in row: + rec["url"] = row["URL"] return rec def compute_preprint_data(rows): - ''' Create a dictionaries of preprint data - Keyword arguments: - rows: preprint types - Returns: - data: preprint data dictionary - preprint: preprint types dictionary - ''' - data = {'Has preprint relation': 0} + """Create a dictionaries of preprint data + Keyword arguments: + rows: preprint types + Returns: + data: preprint data dictionary + preprint: preprint types dictionary + """ + data = {"Has preprint relation": 0} preprint = {} for row in rows: - if 'type' in row['_id']: - preprint[row['_id']['type']] = row['count'] - data['Has preprint relation'] += row['count'] + if "type" in row["_id"]: + preprint[row["_id"]["type"]] = row["count"] + data["Has preprint relation"] += row["count"] else: - preprint['DataCite'] = row['count'] - data['Has preprint relation'] += row['count'] - for key in ('journal-article', 'posted-content', 'DataCite'): + preprint["DataCite"] = row["count"] + data["Has preprint relation"] += row["count"] + for key in ("journal-article", "posted-content", "DataCite"): if key not in preprint: preprint[key] = 0 return data, preprint def counts_by_type(rows): - ''' Count DOIs by type - Keyword arguments: - rows: aggregate rows from dois collection - Returns: - Dictionary of type counts - ''' + """Count DOIs by type + Keyword arguments: + rows: aggregate rows from dois collection + Returns: + Dictionary of type counts + """ typed = {} preprints = 0 for row in rows: - typ = row['_id']['type'] if 'type' in row['_id'] else "DataCite" - sub = row['_id']['subtype'] if 'subtype' in row['_id'] else "" - if sub == 'preprint': - preprints += row['count'] - typ = 'posted-content' - elif (typ == 'DataCite' and row['_id']['DataCite'] == 'Preprint'): - preprints += row['count'] + typ = row["_id"]["type"] if "type" in row["_id"] else "DataCite" + sub = row["_id"]["subtype"] if "subtype" in row["_id"] else "" + if sub == "preprint": + preprints += row["count"] + typ = "posted-content" + elif typ == "DataCite" and row["_id"]["DataCite"] == "Preprint": + preprints += row["count"] if typ not in typed: typed[typ] = 0 - typed[typ] += row['count'] - typed['preprints'] = preprints + typed[typ] += row["count"] + typed["preprints"] = preprints return typed def get_first_last_authors(year): - ''' Get first and last author counts - Keyword arguments: - year: year to get counts for - Returns: - First and last author counts - ''' - stat = {'first': {}, 'last': {}, 'any': {}} + """Get first and last author counts + Keyword arguments: + year: year to get counts for + Returns: + First and last author counts + """ + stat = {"first": {}, "last": {}, "any": {}} for which in ("first", "last", "any"): - if which == 'any': - payload = [{"$match": {"jrc_publishing_date": {"$regex": "^"+ year}, - "jrc_author": {"$exists": True}}}, - {"$group": {"_id": {"type": "$type", "subtype": "$subtype", - "DataCite": "$types.resourceTypeGeneral"}, - "count": {"$sum": 1}}} - ] + if which == "any": + payload = [ + { + "$match": { + "jrc_publishing_date": {"$regex": "^" + year}, + "jrc_author": {"$exists": True}, + } + }, + { + "$group": { + "_id": { + "type": "$type", + "subtype": "$subtype", + "DataCite": "$types.resourceTypeGeneral", + }, + "count": {"$sum": 1}, + } + }, + ] else: - payload = [{"$match": {"jrc_publishing_date": {"$regex": "^"+ year}, - f"jrc_{which}_author": {"$exists": True}}}, - {"$group": {"_id": {"type": "$type", "subtype": "$subtype", - "DataCite": "$types.resourceTypeGeneral"}, - "count": {"$sum": 1}}} - ] + payload = [ + { + "$match": { + "jrc_publishing_date": {"$regex": "^" + year}, + f"jrc_{which}_author": {"$exists": True}, + } + }, + { + "$group": { + "_id": { + "type": "$type", + "subtype": "$subtype", + "DataCite": "$types.resourceTypeGeneral", + }, + "count": {"$sum": 1}, + } + }, + ] try: - rows = DB['dis'].dois.aggregate(payload) + rows = DB["dis"].dois.aggregate(payload) except Exception as err: - return render_template('error.html', urlroot=request.url_root, - title=render_warning("Could not get yearly metrics " \ - + "from dois collection"), - message=error_message(err)) + return render_template( + "error.html", + urlroot=request.url_root, + title=render_warning( + "Could not get yearly metrics " + "from dois collection" + ), + message=error_message(err), + ) for row in rows: - typ = row['_id']['type'] if 'type' in row['_id'] else "DataCite" - sub = row['_id']['subtype'] if 'subtype' in row['_id'] else "" - if sub == 'preprint': - typ = 'posted-content' + typ = row["_id"]["type"] if "type" in row["_id"] else "DataCite" + sub = row["_id"]["subtype"] if "subtype" in row["_id"] else "" + if sub == "preprint": + typ = "posted-content" if typ not in stat[which]: stat[which][typ] = 0 - stat[which][typ] += row['count'] - if sub == 'preprint' or (type == 'DataCite' and row['_id']['DataCite'] == 'Preprint'): - if 'preprints' not in stat[which]: - stat[which]['preprints'] = 0 - stat[which]['preprints'] += row['count'] - return stat['first'], stat['last'], stat['any'] + stat[which][typ] += row["count"] + if sub == "preprint" or ( + type == "DataCite" and row["_id"]["DataCite"] == "Preprint" + ): + if "preprints" not in stat[which]: + stat[which]["preprints"] = 0 + stat[which]["preprints"] += row["count"] + return stat["first"], stat["last"], stat["any"] def get_no_relation(year=None): - ''' Get DOIs with no relation - Keyword arguments: - year: year (optional) - Returns: - Dictionary of types/subtypes with no relation - ''' + """Get DOIs with no relation + Keyword arguments: + year: year (optional) + Returns: + Dictionary of types/subtypes with no relation + """ no_relation = {"Crossref": {}, "DataCite": {}} - payload = {"Crossref_journal": {"type": "journal-article", "subtype": {"$ne": "preprint"}, - "jrc_preprint": {"$exists": False}}, - "Crossref_preprint": {"subtype": "preprint", "jrc_preprint": {"$exists": False}}, - "DataCite_journal": {"jrc_obtained_from": "DataCite", - "types.resourceTypeGeneral": {"$ne": "Preprint"}, - "jrc_preprint": {"$exists": False}}, - "DataCite_preprint": {"types.resourceTypeGeneral": "Preprint", - "jrc_preprint": {"$exists": False}} - } + payload = { + "Crossref_journal": { + "type": "journal-article", + "subtype": {"$ne": "preprint"}, + "jrc_preprint": {"$exists": False}, + }, + "Crossref_preprint": { + "subtype": "preprint", + "jrc_preprint": {"$exists": False}, + }, + "DataCite_journal": { + "jrc_obtained_from": "DataCite", + "types.resourceTypeGeneral": {"$ne": "Preprint"}, + "jrc_preprint": {"$exists": False}, + }, + "DataCite_preprint": { + "types.resourceTypeGeneral": "Preprint", + "jrc_preprint": {"$exists": False}, + }, + } if year: for pay in payload.values(): - pay["jrc_publishing_date"] = {"$regex": "^"+ year} + pay["jrc_publishing_date"] = {"$regex": "^" + year} for key, val in payload.items(): try: - cnt = DB['dis'].dois.count_documents(val) + cnt = DB["dis"].dois.count_documents(val) except Exception as err: raise err - src, typ = key.split('_') + src, typ = key.split("_") no_relation[src][typ] = cnt return no_relation def get_preprint_stats(rows): - ''' Create a dictionary of preprint statistics - Keyword arguments: - rows: types/subtypes over years - Returns: - Preprint statistics dictionary - ''' + """Create a dictionary of preprint statistics + Keyword arguments: + rows: types/subtypes over years + Returns: + Preprint statistics dictionary + """ stat = {} for row in rows: - if 'type' not in row['_id']: + if "type" not in row["_id"]: continue - if 'sub' in row['_id'] and row['_id']['sub'] == 'preprint': - if row['_id']['year'] not in stat: - stat[row['_id']['year']] = {} - for sub in ('journal', 'preprint'): - if sub not in stat[row['_id']['year']]: - stat[row['_id']['year']][sub] = 0 - stat[row['_id']['year']]['preprint'] += row['count'] - elif row['_id']['type'] == 'journal-article': - if row['_id']['year'] not in stat: - stat[row['_id']['year']] = {} - for sub in ('journal', 'preprint'): - if sub not in stat[row['_id']['year']]: - stat[row['_id']['year']][sub] = 0 - stat[row['_id']['year']]['journal'] += row['count'] + if "sub" in row["_id"] and row["_id"]["sub"] == "preprint": + if row["_id"]["year"] not in stat: + stat[row["_id"]["year"]] = {} + for sub in ("journal", "preprint"): + if sub not in stat[row["_id"]["year"]]: + stat[row["_id"]["year"]][sub] = 0 + stat[row["_id"]["year"]]["preprint"] += row["count"] + elif row["_id"]["type"] == "journal-article": + if row["_id"]["year"] not in stat: + stat[row["_id"]["year"]] = {} + for sub in ("journal", "preprint"): + if sub not in stat[row["_id"]["year"]]: + stat[row["_id"]["year"]][sub] = 0 + stat[row["_id"]["year"]]["journal"] += row["count"] return stat def get_source_data(year): - ''' Get DOI data by source and type/subtype or resourceTypeGeneral - Keyword arguments: - year: year to get data for - Returns: - Data dictionary and html dictionary - ''' + """Get DOI data by source and type/subtype or resourceTypeGeneral + Keyword arguments: + year: year to get data for + Returns: + Data dictionary and html dictionary + """ # Crossref - if year != 'All': - match = {"jrc_obtained_from": "Crossref", - "jrc_publishing_date": {"$regex": "^"+ year}} + if year != "All": + match = { + "jrc_obtained_from": "Crossref", + "jrc_publishing_date": {"$regex": "^" + year}, + } else: match = {"jrc_obtained_from": "Crossref"} - payload = [{"$match": match}, - {"$group": {"_id": {"source": "$jrc_obtained_from", "type": "$type", - "subtype": "$subtype"}, - "count": {"$sum": 1}}}, - ] + payload = [ + {"$match": match}, + { + "$group": { + "_id": { + "source": "$jrc_obtained_from", + "type": "$type", + "subtype": "$subtype", + }, + "count": {"$sum": 1}, + } + }, + ] try: - rows = DB['dis'].dois.aggregate(payload) + rows = DB["dis"].dois.aggregate(payload) except Exception as err: - return render_template('error.html', urlroot=request.url_root, - title=render_warning("Could not get Crossref types from dois"), - message=error_message(err)) + return render_template( + "error.html", + urlroot=request.url_root, + title=render_warning("Could not get Crossref types from dois"), + message=error_message(err), + ) data = {"Crossref": 0, "DataCite": 0} hdict = {} for row in rows: - for field in ('type', 'subtype'): - if field not in row['_id']: - row['_id'][field] = '' - data['Crossref'] += row['count'] - hdict["_".join([row['_id']['source'], row['_id']['type'], - row['_id']['subtype']])] = row['count'] + for field in ("type", "subtype"): + if field not in row["_id"]: + row["_id"][field] = "" + data["Crossref"] += row["count"] + hdict[ + "_".join([row["_id"]["source"], row["_id"]["type"], row["_id"]["subtype"]]) + ] = row["count"] # DataCite - match['jrc_obtained_from'] = "DataCite" - payload = [{"$match": match}, - {"$group": {"_id": "$types.resourceTypeGeneral","count": {"$sum": 1}}} - ] + match["jrc_obtained_from"] = "DataCite" + payload = [ + {"$match": match}, + {"$group": {"_id": "$types.resourceTypeGeneral", "count": {"$sum": 1}}}, + ] try: - rows = DB['dis'].dois.aggregate(payload) + rows = DB["dis"].dois.aggregate(payload) except Exception as err: - return render_template('error.html', urlroot=request.url_root, - title=render_warning("Could not get DataCite types from dois"), - message=error_message(err)) + return render_template( + "error.html", + urlroot=request.url_root, + title=render_warning("Could not get DataCite types from dois"), + message=error_message(err), + ) for row in rows: - data['DataCite'] += row['count'] - hdict["_".join(['DataCite', row['_id'], ""])] = row['count'] + data["DataCite"] += row["count"] + hdict["_".join(["DataCite", row["_id"], ""])] = row["count"] return data, hdict -def s2_citation_count(doi, fmt='plain'): - ''' Get citation count from Semantic Scholar - Keyword arguments: - doi: DOI - fmt: format (plain or html) - Returns: - Citation count - ''' +def s2_citation_count(doi, fmt="plain"): + """Get citation count from Semantic Scholar + Keyword arguments: + doi: DOI + fmt: format (plain or html) + Returns: + Citation count + """ url = f"{app.config['S2_GRAPH']}paper/DOI:{doi}?fields=citationCount" - headers = {'x-api-key': app.config['S2_API_KEY']} + headers = {"x-api-key": app.config["S2_API_KEY"]} try: resp = requests.get(url, headers=headers, timeout=10) if resp.status_code == 429: @@ -982,11 +1144,13 @@ def s2_citation_count(doi, fmt='plain'): if resp.status_code != 200: return 0 data = resp.json() - if fmt == 'html': - cnt = f"" \ - + f"{data['citationCount']}" + if fmt == "html": + cnt = ( + f"" + + f"{data['citationCount']}" + ) else: - cnt = data['citationCount'] + cnt = data["citationCount"] return cnt except Exception: return 0 @@ -996,15 +1160,16 @@ def s2_citation_count(doi, fmt='plain'): # * Badge utility functions * # ****************************************************************************** + def tiny_badge(btype, msg, link=None): - ''' Create HTML for a [very] small badge - Keyword arguments: - btype: badge type (success, danger, etc.) - msg: message to show on badge - link: link to other web page - Returns: - HTML - ''' + """Create HTML for a [very] small badge + Keyword arguments: + btype: badge type (success, danger, etc.) + msg: message to show on badge + link: link to other web page + Returns: + HTML + """ html = f"{msg}" if link: html = f"{html}" @@ -1012,126 +1177,133 @@ def tiny_badge(btype, msg, link=None): def get_badges(auth): - ''' Create a list of badges for an author - Keyword arguments: - auth: detailed author record - Returns: - List of HTML badges - ''' + """Create a list of badges for an author + Keyword arguments: + auth: detailed author record + Returns: + List of HTML badges + """ badges = [] - if 'in_database' in auth and auth['in_database']: + if "in_database" in auth and auth["in_database"]: badges.append(f"{tiny_badge('success', 'In database')}") - if auth['alumni']: + if auth["alumni"]: badges.append(f"{tiny_badge('danger', 'Alumni')}") - elif 'validated' not in auth or not auth['validated']: + elif "validated" not in auth or not auth["validated"]: badges.append(f"{tiny_badge('warning', 'Not validated')}") - if 'orcid' not in auth or not auth['orcid']: + if "orcid" not in auth or not auth["orcid"]: badges.append(f"{tiny_badge('urgent', 'No ORCID')}") - if auth['asserted']: + if auth["asserted"]: badges.append(f"{tiny_badge('info', 'Janelia affiliation')}") - if 'duplicate_name' in auth: + if "duplicate_name" in auth: badges.append(f"{tiny_badge('warning', 'Duplicate name')}") else: badges.append(f"{tiny_badge('danger', 'Not in database')}") - if 'asserted' in auth and auth['asserted']: + if "asserted" in auth and auth["asserted"]: badges.append(f"{tiny_badge('info', 'Janelia affiliation')}") return badges def show_tagged_authors(authors): - ''' Create a list of Janelian authors (with badges and tags) - Keyword arguments: - authors: list of detailed authors from a publication - Returns: - List of HTML authors - ''' + """Create a list of Janelian authors (with badges and tags) + Keyword arguments: + authors: list of detailed authors from a publication + Returns: + List of HTML authors + """ alist = [] count = 0 for auth in authors: - if (not auth['janelian']) and (not auth['asserted']) and (not auth['alumni']): + if (not auth["janelian"]) and (not auth["asserted"]) and (not auth["alumni"]): continue - if auth['janelian'] or auth['asserted']: + if auth["janelian"] or auth["asserted"]: count += 1 who = f"{auth['given']} {auth['family']}" - if 'orcid' in auth and auth['orcid']: + if "orcid" in auth and auth["orcid"]: who = f"{who}" - elif 'userIdO365' in auth and auth['userIdO365']: + elif "userIdO365" in auth and auth["userIdO365"]: who = f"{who}" badges = get_badges(auth) tags = [] - if 'group' in auth: - tags.append(auth['group']) - if 'tags' in auth: - for tag in auth['tags']: + if "group" in auth: + tags.append(auth["group"]) + if "tags" in auth: + for tag in auth["tags"]: if tag not in tags: tags.append(tag) tags.sort() row = f"" alist.append(row) - return f"
{who}{' '.join(badges)}{', '.join(tags)}
{''.join(alist)}
", count + return ( + f"{''.join(alist)}
", + count, + ) def add_orcid_badges(orc): - ''' Generate badges for an ORCID ID that is in the orcid collection - Keyword arguments: - orc: row from orcid collection - Returns: - List of badges - ''' + """Generate badges for an ORCID ID that is in the orcid collection + Keyword arguments: + orc: row from orcid collection + Returns: + List of badges + """ badges = [] - badges.append(tiny_badge('success', 'In database')) - if 'duplicate_name' in orc: - badges.append(tiny_badge('warning', 'Duplicate name')) - if 'orcid' not in orc or not orc['orcid']: + badges.append(tiny_badge("success", "In database")) + if "duplicate_name" in orc: + badges.append(tiny_badge("warning", "Duplicate name")) + if "orcid" not in orc or not orc["orcid"]: badges.append(f"{tiny_badge('urgent', 'No ORCID')}") - if 'alumni' in orc: - badges.append(tiny_badge('danger', 'Alumni')) - if 'employeeId' not in orc: - badges.append(tiny_badge('warning', 'Not validated')) + if "alumni" in orc: + badges.append(tiny_badge("danger", "Alumni")) + if "employeeId" not in orc: + badges.append(tiny_badge("warning", "Not validated")) return badges + # ****************************************************************************** # * General utility functions * # ****************************************************************************** + def random_string(strlen=8): - ''' Generate a random string of letters and digits - Keyword arguments: - strlen: length of generated string - ''' + """Generate a random string of letters and digits + Keyword arguments: + strlen: length of generated string + """ cmps = string.ascii_letters + string.digits - return ''.join(random.choice(cmps) for i in range(strlen)) + return "".join(random.choice(cmps) for i in range(strlen)) def create_downloadable(name, header, content): - ''' Generate a downloadable content file - Keyword arguments: - name: base file name - header: table header - content: table content - Returns: - File name - ''' + """Generate a downloadable content file + Keyword arguments: + name: base file name + header: table header + content: table content + Returns: + File name + """ fname = f"{name}_{random_string()}_{datetime.today().strftime('%Y%m%d%H%M%S')}.tsv" with open(f"/tmp/{fname}", "w", encoding="utf8") as text_file: if header: content = "\t".join(header) + "\n" + content text_file.write(content) - return f'Download tab-delimited file' - - -def humansize(num, suffix='B', places=2, space='disk'): - ''' Return a human-readable storage size - Keyword arguments: - num: size - suffix: default suffix - space: "disk" or "mem" - Returns: - string - ''' - limit = 1024.0 if space == 'disk' else 1000.0 - for unit in ['', 'K', 'M', 'G', 'T']: + return ( + f'Download tab-delimited file' + ) + + +def humansize(num, suffix="B", places=2, space="disk"): + """Return a human-readable storage size + Keyword arguments: + num: size + suffix: default suffix + space: "disk" or "mem" + Returns: + string + """ + limit = 1024.0 if space == "disk" else 1000.0 + for unit in ["", "K", "M", "G", "T"]: if abs(num) < limit: return f"{num:.{places}f}{unit}{suffix}" num /= limit @@ -1139,24 +1311,24 @@ def humansize(num, suffix='B', places=2, space='disk'): def dloop(row, keys, sep="\t"): - ''' Generate a string of joined velues from a dictionary - Keyword arguments: - row: dictionary - keys: list of keys - sep: separator - Returns: - Joined values from a dictionary - ''' + """Generate a string of joined velues from a dictionary + Keyword arguments: + row: dictionary + keys: list of keys + sep: separator + Returns: + Joined values from a dictionary + """ return sep.join([str(row[fld]) for fld in keys]) def last_thursday(): - ''' Calculate the date of the most recent Thursday - Keyword arguments: - None - Returns: - Date of the most recent Thursday - ''' + """Calculate the date of the most recent Thursday + Keyword arguments: + None + Returns: + Date of the most recent Thursday + """ today = date.today() offset = (today.weekday() - 3) % 7 if offset: @@ -1165,29 +1337,31 @@ def last_thursday(): def weeks_ago(weeks): - ''' Calculate the date of a number of weeks ago - Keyword arguments: - weeks: number of weeks - Returns: - Date of a number of weeks ago - ''' + """Calculate the date of a number of weeks ago + Keyword arguments: + weeks: number of weeks + Returns: + Date of a number of weeks ago + """ today = date.today() return today - timedelta(weeks=weeks) def year_pulldown(prefix, all_years=True): - ''' Generate a year pulldown - Keyword arguments: - prefic: navigation prefix - Returns: - Pulldown HTML - ''' - years = ['All'] if all_years else [] + """Generate a year pulldown + Keyword arguments: + prefic: navigation prefix + Returns: + Pulldown HTML + """ + years = ["All"] if all_years else [] for year in range(datetime.now().year, 2005, -1): years.append(str(year)) - html = "