diff --git a/src/config.py b/src/config.py index 7f6faa48..c7b6c703 100644 --- a/src/config.py +++ b/src/config.py @@ -96,6 +96,7 @@ (r"/api/metakg/consolidated/?", "handlers.api.MetaKGQueryHandler", {"biothing_type": "metakg_consolidated"}), (r"/api/metakg/consolidated/fields/?", "biothings.web.handlers.MetadataFieldHandler", {"biothing_type": "metakg_consolidated"}), (r"/api/metakg/paths/?", "handlers.api.MetaKGPathFinderHandler", {"biothing_type": "metakgpathfinder"}), + (r"/api/metakg/parse/?", "handlers.api.MetaKGParserHandler"), ] # biothings web tester will read this diff --git a/src/handlers/api.py b/src/handlers/api.py index cc0a89ad..e075103c 100644 --- a/src/handlers/api.py +++ b/src/handlers/api.py @@ -1,8 +1,8 @@ import asyncio import json import logging -from typing import List, Union import os + import bmt from biothings.utils import serializer from biothings.web.auth.authn import BioThingsAuthnMixin @@ -10,17 +10,18 @@ from biothings.web.handlers.query import BiothingHandler, capture_exceptions from biothings.web.settings.default import QUERY_KWARGS from tornado.httpclient import AsyncHTTPClient -from tornado.web import Finish, HTTPError from tornado.template import Loader +from tornado.web import Finish, HTTPError from controller import SmartAPI from controller.exceptions import ControllerError, NotFoundError from pipeline import MetaKGQueryPipeline from utils.downloader import DownloadError, download_async +from utils.metakg.biolink_helpers import get_expanded_values +from utils.metakg.cytoscape_formatter import CytoscapeDataFormatter from utils.metakg.export import edges2graphml +from utils.metakg.parser import MetaKGParser from utils.metakg.path_finder import MetaKGPathFinder -from utils.metakg.cytoscape_formatter import CytoscapeDataFormatter -from utils.metakg.biolink_helpers import get_expanded_values from utils.notification import SlackNewAPIMessage, SlackNewTranslatorAPIMessage logger = logging.getLogger("smartAPI") @@ -382,7 +383,68 @@ def post(self): raise HTTPError(400, reason="Missing required form field: id") -class MetaKGQueryHandler(QueryHandler): +class MetaKGHandlerMixin: + """ + Mixin to provide reusable logic for filtering API information. + """ + def get_filtered_api(self, api_dict): + """Extract and return filtered API information.""" + api_info = api_dict.get("api", api_dict) # Handle both formats + + # Default to False if not present + bte = self.args.bte + api_details = self.args.api_details + + # Default structure to preserve top-level keys + filtered_dict = { + key: api_dict.get(key) + for key in ["subject", "object", "predicate", "subject_prefix", "object_prefix"] + if key in api_dict + } + + # Determine filtered API structure based on `bte` and `api_details` + if bte and not api_details: + # When bte is True and api_details is False, include only minimal API info + filtered_api = { + **({"name": api_info.get("name")} if "name" in api_info else {}), + **( + {"smartapi": {"id": api_info.get("smartapi", {}).get("id", None)}} + if "smartapi" in api_info + else {"smartapi": {"id": None}} + ), + "bte": api_info.get("bte", {}), + } + elif api_details: + # When api_details is True, include more detailed information + filtered_api = api_info.copy() + if not bte: + filtered_api.pop("bte", None) + + # Handle case where "ui" key exists and ends with "None" + if filtered_api.get('smartapi', {}).get("ui", "").endswith("/None"): + filtered_api["smartapi"]["ui"] = None + else: + # Default: No bte and no api_details - just minimal API info + filtered_api = { + **({"name": api_info.get("name")} if "name" in api_info else {}), + **( + {"smartapi": {"id": api_info.get("smartapi", {}).get("id", None)}} + if "smartapi" in api_info + else {"smartapi": {"id": None}} + ), + } + + # Add the filtered 'api' key to the preserved top-level structure + filtered_dict["api"] = filtered_api + + # Remove 'bte' from 'api' and move it to the top level + if "bte" in filtered_dict["api"]: + filtered_dict["bte"] = filtered_dict["api"].pop("bte") + + return filtered_dict + + +class MetaKGQueryHandler(QueryHandler, MetaKGHandlerMixin): """ Support metakg queries with biolink model's semantic descendants @@ -458,30 +520,8 @@ async def get(self, *args, **kwargs): value_list = get_expanded_values(value_list, self.biolink_model_toolkit) if expanded_fields[field] else value_list setattr(self.args, field, value_list) - await super().get(*args, **kwargs) - def get_filtered_api(self, api_dict): - """Extract and return filtered API information.""" - api_info = api_dict - if not self.args.bte and not self.args.api_details: # no bte and no api details - filtered_api= { - **({"name": api_info["name"]} if "name" in api_info else {}), - **({"smartapi": {"id": api_info["smartapi"]["id"]}} if "smartapi" in api_info and "id" in api_info["smartapi"] else {}) - } - elif self.args.bte and not self.args.api_details : # bte and no api details - filtered_api= { - **({"name": api_info["name"]} if "name" in api_info else {}), - **({"smartapi": {"id": api_info["smartapi"]["id"]}} if "smartapi" in api_info and "id" in api_info["smartapi"] else {}), - 'bte': api_info.get('bte', {}) - } - elif not self.args.bte and self.args.api_details: # no bte and api details - api_info.pop('bte', None) - filtered_api = api_info - else: - filtered_api = api_info - return filtered_api - def process_apis(self, apis): """Process each API dict based on provided args.""" if isinstance(apis, list): @@ -491,11 +531,11 @@ def process_apis(self, apis): elif isinstance(apis, dict): if 'bte' in apis: # update dict for new format - apis['api']['bte']=apis.pop('bte') + apis['api']['bte'] = apis.pop('bte') api_dict = apis["api"] - filtered_api= self.get_filtered_api(api_dict) + filtered_api = self.get_filtered_api(api_dict) apis["api"] = filtered_api - + def write(self, chunk): """ Overwrite the biothings query handler to ... @@ -522,10 +562,10 @@ def write(self, chunk): self.set_header("Content-Disposition", 'attachment; filename="smartapi_metakg.graphml"') return super(BaseAPIHandler, self).write(chunk) - + if self.format == "html": # setup template - template_path = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'templates')) + template_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'templates')) loader = Loader(template_path) template = loader.load("cytoscape.html") # initial counts @@ -542,7 +582,7 @@ def write(self, chunk): graph_data = serializer.to_json(cdf.get_data()) # generate global template variable with graph data result = template.generate( - data= graph_data, + data=graph_data, response=serializer.to_json(chunk), shown=shown, available=available, @@ -586,7 +626,7 @@ class MetaKGPathFinderHandler(QueryHandler): "max": 6, "default": [], "enum": ["subject", "object", "predicate", "node", "edge", "all"] - } + } }, } @@ -680,9 +720,134 @@ async def get(self, *args, **kwargs): raw_query_output = self.setup_pathfinder_rawquery(expanded_fields) self.write(raw_query_output) return - res = { - "total": len(paths_with_edges), - "paths": paths_with_edges, - } + res = { + "total": len(paths_with_edges), + "paths": paths_with_edges, + } await asyncio.sleep(0.01) self.finish(res) + + +class MetaKGParserHandler(BaseHandler, MetaKGHandlerMixin): + """ + Handles parsing of SmartAPI metadata from a given URL or request body. + + This handler processes SmartAPI metadata and returns structured, + cleaned results based on the specified query parameters. + + Supported HTTP methods: + - **GET**: Parses metadata from a provided URL. + - **POST**: Parses metadata from the request body. + + Query Parameters: + - `url` (str, required): The URL of the SmartAPI metadata to parse. + Maximum length: 1000 characters. + - `api_details` (bool, optional, default: `False`): + Whether to return detailed API information. + - `bte` (bool, optional, default: `False`): + Whether to include BTE (BioThings Explorer) specific metadata. + """ + + kwargs = { + "*": { + "api_details": {"type": bool, "default": False}, + "bte": {"type": bool, "default": False}, + }, + "GET": { + "url": { + "type": str, + "required": True, + "max": 1000, + "description": "URL of the SmartAPI metadata to parse" + }, + }, + } + + def initialize(self, *args, **kwargs): + super().initialize(*args, **kwargs) + # change the default query pipeline from self.biothings.pipeline + self.pipeline = MetaKGQueryPipeline(ns=self.biothings) + + def process_apis(self, apis): + """Process each API dict based on provided args.""" + if isinstance(apis, list): + for i, api_dict in enumerate(apis): + filtered_api = self.get_filtered_api(api_dict) + apis[i] = filtered_api + elif isinstance(apis, dict): + if "bte" in apis: + # Update dict for new format + apis["api"]["bte"] = apis.pop("bte") + api_dict = apis["api"] + filtered_api = self.get_filtered_api(api_dict) + apis["api"] = filtered_api + return apis + + async def get(self, *args, **kwargs): + url = self.args.url + parser = MetaKGParser() + + try: + parsed_metakg = parser.get_metakg(url=url) + except DownloadError: + self.write_error(400, reason="There was an error downloading the data from the given url.") + except (ValueError, TypeError) as err: + self.write_error( + status_code=400, + reason="The data retrived from the given url is not a valid JSON or YAML object.", + message=str(err) + ) + + # Apply filtering -- if data found + if parsed_metakg: + for i, api_dict in enumerate(parsed_metakg): + parsed_metakg[i] = self.get_filtered_api(api_dict) + + # Add url to metadata if api_details is set to 1 + if self.args.api_details: + for data_dict in parsed_metakg: + if "metadata" in data_dict["api"]["smartapi"] and data_dict["api"]["smartapi"]["metadata"] is None: + data_dict["api"]["smartapi"]["metadata"] = url + + response = { + "total": len(parsed_metakg), + "hits": parsed_metakg, + } + + self.finish(response) + + async def post(self, *args, **kwargs): + content_type = self.request.headers.get("Content-Type", "").lower() + if content_type in ["application/json", "application/x-yaml"]: + # if content type is set properly, it should have alrady been parsed + metadata_from_body = self.args_json or self.args_yaml + elif self.request.body: + # if request body is provided but no proper content type is set + # we will parse it as YAML anyway + metadata_from_body = self._parse_yaml() + else: + metadata_from_body = None + + if metadata_from_body: + # Process the parsed metadata + parser = MetaKGParser() + parsed_metakg = parser.get_metakg(metadata_from_body) + + # Apply filtering to the combined data + if parsed_metakg: + for i, api_dict in enumerate(parsed_metakg): + parsed_metakg[i] = self.get_filtered_api(api_dict) + + # Send the response back to the client + response = { + "total": len(parsed_metakg), + "hits": parsed_metakg, + } + + self.finish(response) + else: + self.write_error( + status_code=400, + reason="Request body cannot be empty.", + message="Please provide a valid JSON/YAML object in the request body." + ) diff --git a/src/pipeline.py b/src/pipeline.py index 748a6be8..d1122574 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -2,14 +2,10 @@ from enum import Enum from typing import Dict, OrderedDict -from biothings.web.query import ( - AsyncESQueryBackend, - AsyncESQueryPipeline, - ESQueryBuilder, - ESResultFormatter, -) -from controller.base import OpenAPI, Swagger +from biothings.web.query import AsyncESQueryBackend, AsyncESQueryPipeline, ESQueryBuilder, ESResultFormatter from elasticsearch_dsl import Q, Search + +from controller.base import OpenAPI, Swagger from utils import decoder @@ -219,8 +215,8 @@ def apply_extras(self, search, options): apply extra filters """ # if not options._source: - # by default exclude api.bte or bte field, but can be included by specifying in the fields parameter - # options._source = ["-api.bte", "-bte"] + # by default exclude api.bte or bte field, but can be included by specifying in the fields parameter + # options._source = ["-api.bte", "-bte"] search = super().apply_extras(search, options) # apply extra filters from query parameters @@ -262,6 +258,7 @@ def adjust_index(self, original_index: str, query: str, **options: Dict) -> str: query_index = self.indices.get("metakg", None) return query_index + class MetaKGQueryPipeline(AsyncESQueryPipeline): def __init__(self, *args, **kwargs): # ns is an instance of BiothingsNamespace diff --git a/src/utils/metakg/parser.py b/src/utils/metakg/parser.py index 8abc48df..7dce2f7b 100644 --- a/src/utils/metakg/parser.py +++ b/src/utils/metakg/parser.py @@ -1,6 +1,7 @@ import json import logging from copy import copy +from typing import Dict, List, Optional, Union import requests @@ -13,33 +14,72 @@ class MetaKGParser: get_url_timeout = 60 metakg_errors = None - def get_non_TRAPI_metadatas(self, data, extra_data=None): - parser = API(data) - mkg = self.extract_metakgedges(parser.metadata["operations"], extra_data=extra_data) + def get_metakg(self, + data: Optional[Union[Dict, API]] = None, + extra_data: Optional[Dict] = None, + url: Optional[str] = None) -> List[Dict]: + """ + Extract and process metadata from a SmartAPI document or URL. + Returns MetaKG edges or propagates errors. + """ + if not data and not url: + raise ValueError("Either data or url value is expected for this request, please provide data or a url.") + + # if both data and url are provided, prefer data + if data: + _api = data if isinstance(data, API) else API(data) + elif url: + _api = API(url=url) + + if _api.is_trapi: + return self.get_TRAPI_metadatas(data=_api, extra_data=extra_data) + else: + return self.get_non_TRAPI_metadatas(data=_api, extra_data=extra_data) + + def get_non_TRAPI_metadatas(self, data: Union[Dict, API], extra_data: Optional[Dict] = None) -> List[Dict]: + """ + Extract MetaKG edges from a SmartAPI document provided as `data` or fetched from a `url`. + Raises an error if no valid input is given, or if parser fails to parse the document. + """ + _api = data if isinstance(data, API) else API(data) + mkg = self.extract_metakgedges(_api.metadata["operations"], extra_data=extra_data) no_nodes = len({x["subject"] for x in mkg} | {x["object"] for x in mkg}) no_edges = len({x["predicate"] for x in mkg}) logger.info("Done [%s nodes, %s edges]", no_nodes, no_edges) return mkg - def get_TRAPI_metadatas(self, data, extra_data=None): + def get_TRAPI_metadatas(self, data: Union[Dict, API], extra_data: Optional[Dict] = None) -> List[Dict]: + """ + Extract and process TRAPI metadata from a SmartAPI document or URL. + Returns MetaKG edges or propagates errors. + """ ops = [] metadata_list = self.get_TRAPI_with_metakg_endpoint(data) count_metadata_list = len(metadata_list) self.metakg_errors = {} + for i, metadata in enumerate(metadata_list): ops.extend(self.get_ops_from_metakg_endpoint(metadata, f"[{i + 1}/{count_metadata_list}]")) + if self.metakg_errors: - cnt_metakg_errors = sum([len(x) for x in self.metakg_errors.values()]) + cnt_metakg_errors = sum(len(x) for x in self.metakg_errors.values()) logger.error(f"Found {cnt_metakg_errors} TRAPI metakg errors:\n {json.dumps(self.metakg_errors, indent=2)}") return self.extract_metakgedges(ops, extra_data=extra_data) - def get_TRAPI_with_metakg_endpoint(self, data): + def get_TRAPI_with_metakg_endpoint(self, data: Union[Dict, API]): + """ + Retrieve TRAPI metadata from a SmartAPI document or URL. + Returns metadata if TRAPI endpoints are found, else an empty list. + """ metadatas = [] - parser = API(data) - metadata = parser.metadata + _api = data if isinstance(data, API) else API(data) + + metadata = _api.metadata _paths = metadata.get("paths", {}) _team = metadata.get("x-translator", {}).get("team") + + # Check for required TRAPI paths if "/meta_knowledge_graph" in _paths and "/query" in _paths and _team: metadatas.append(metadata) return metadatas