diff --git a/.env.dist b/.env.dist index d746340..fa58085 100644 --- a/.env.dist +++ b/.env.dist @@ -3,4 +3,5 @@ OPENROUTER_API_KEY= GITHUB_TOKEN= GITLAB_TOKEN= MODEL= -PROVIDER= \ No newline at end of file +PROVIDER= +SELENIUM_REMOTE_URL= \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cf4ca7..fa91dba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to this project will be documented in this file. +## [1.0.0] - 2025-08-06 + +### Added +- Users and Organization compatibility +- Endpoints refactoring +- Parallel calling +- Multiworkers entrypoint + ## [0.1.0] - 2025-06-25 ### Added diff --git a/Dockerfile b/Dockerfile index 998b891..e2c3d4c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,4 +15,4 @@ COPY . . ENV PYTHONUNBUFFERED=1 -ENTRYPOINT ["uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "1234"] \ No newline at end of file +ENTRYPOINT ["gunicorn", "src.api:app", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:1234"] \ No newline at end of file diff --git a/README.md b/README.md index b737785..3237c3b 100644 --- a/README.md +++ b/README.md @@ -84,19 +84,19 @@ If no arguments are provided, it will use the default repository and output path 1. You need to build the image. ``` bash - docker build -t llm-software-finder . + docker build -t git-metadata-extractor . ``` 2. Run the image. ``` bash - docker run -it --env-file .env -p 1234:1234 --entrypoint bash llm-software-finder + docker run -it --env-file .env -p 1234:1234 --entrypoint bash git-metadata-extractor ``` If you are developping the application it's useful to mount the app volume. ``` bash - docker run -it --env-file .env -p 1234:1234 -v .:/app --entrypoint bash llm-software-finder + docker run -it --env-file .env -p 1234:1234 -v .:/app --entrypoint bash git-metadata-extractor ``` 3. Then you can run the tool via @@ -105,12 +105,18 @@ If no arguments are provided, it will use the default repository and output path python src/main.py --url https://github.com/qchapp/lungs-segmentation --output_path output_file.json ``` +4. Optional. If you are planning to use the ORCID functionality, you need to start a remote browser and configure the `.env` file. + + ``` bash + docker run --rm -d -p 4444:4444 -p 7900:7900 --shm-size="2g" selenium/standalone-firefox + ``` + ## How to develop using Docker? To facilitate the development we can mount the app folder in the docker. By doing this, all changes made in local will be accesible from the running container. ```bash -docker run -it --env-file .env -p 1234:1234 -v .:/app llm-software-finder +docker run -it --env-file .env -p 1234:1234 -v .:/app git-metadata-extractor ``` @@ -119,7 +125,7 @@ docker run -it --env-file .env -p 1234:1234 -v .:/app llm-software-finder Simply run: ``` -docker run -it --env-file .env -p 1234:1234 llm-software-finder +docker run -it --env-file .env -p 1234:1234 git-metadata-extractor ``` and go to `localhost:1234` @@ -128,7 +134,7 @@ and go to `localhost:1234` Or if you are running the container with `bash` as the entrypoint, please execute. ```bash -uvicorn src.api:app --host 0.0.0.0 --port 1234 --reload +uvicorn src.api:app --host 0.0.0.0 --workers 4 --port 1234 --reload ``` `--reload` allows you to modify the files and reload automatically the api endpoint. Excellent for development. @@ -138,3 +144,6 @@ uvicorn src.api:app --host 0.0.0.0 --port 1234 --reload Quentin Chappuis - EPFL Center for Imaging Robin Franken - SDSC Carlos Vivar Rios - SDSC / EPFL Center for Imaging + + +docker run --network open-pulse --rm -d -p 4444:4444 -p 7900:7900 --shm-size="2g" selenium/standalone-firefox \ No newline at end of file diff --git a/docs/docker-cleanup-strategy.md b/docs/docker-cleanup-strategy.md deleted file mode 100644 index b92c61a..0000000 --- a/docs/docker-cleanup-strategy.md +++ /dev/null @@ -1,108 +0,0 @@ -# Docker Image Cleanup Strategy - -This document explains how our GitHub Actions workflows manage Docker images to prevent GHCR from getting cluttered with development images. - -## ๐Ÿ—๏ธ Image Building Strategy - -### Main Workflow (`publish_image_in_GHCR.yaml`) -- **Main branch**: `latest` + version tag (e.g., `0.1.0`) -- **Develop branch**: `develop` tag -- **Pull Requests**: `pr-{number}` tag (e.g., `pr-123`) -- **Feature branches**: `{branch-name}` tag (e.g., `feature-awesome-feature`) - -**Optimization**: Skips building images for draft PRs to reduce unnecessary builds. - -## ๐Ÿงน Cleanup Strategy - -### Automatic Cleanup (`cleanup_images.yaml`) - -#### 1. **PR Image Cleanup** -- **Trigger**: When a PR is closed (merged or rejected) -- **Action**: Automatically deletes the `pr-{number}` image -- **Benefit**: No manual intervention needed - -#### 2. **Branch Image Cleanup** -- **Trigger**: When a branch is deleted -- **Action**: Automatically deletes the corresponding branch image -- **Benefit**: Keeps registry clean when feature work is complete - -#### 3. **Scheduled Cleanup** -- **Trigger**: Every Sunday at 2 AM UTC -- **Action**: Deletes development images older than 7 days -- **Protected**: Never deletes `latest` or version-tagged images -- **Configurable**: Can be adjusted via workflow dispatch - -#### 4. **Manual Cleanup** -- **Trigger**: Manual workflow dispatch -- **Options**: - - `days_old`: How old images should be before deletion (default: 7) - - `tag_pattern`: Which tag pattern to clean (default: `pr-*`) - -## ๐Ÿ›ก๏ธ Protected Images - -The cleanup workflows will **NEVER** delete: -- `latest` tag -- Version tags (e.g., `1.0.0`, `2.1.3`) -- Images newer than the specified age threshold - -## ๐Ÿ“Š Cleanup Examples - -```bash -# Images that WILL be cleaned up (after 7 days): -ghcr.io/imaging-plaza/git-metadata-extractor:pr-123 -ghcr.io/imaging-plaza/git-metadata-extractor:feature-new-api -ghcr.io/imaging-plaza/git-metadata-extractor:develop # if older than 7 days - -# Images that will NEVER be cleaned up: -ghcr.io/imaging-plaza/git-metadata-extractor:latest -ghcr.io/imaging-plaza/git-metadata-extractor:0.1.0 -ghcr.io/imaging-plaza/git-metadata-extractor:1.2.3 -``` - -## ๐Ÿ”ง Manual Cleanup Commands - -### Clean all PR images older than 3 days: -1. Go to Actions tab in GitHub -2. Select "Cleanup Development Images" -3. Click "Run workflow" -4. Set `days_old` to `3` and `tag_pattern` to `pr-*` - -### Clean all feature branch images: -1. Same as above -2. Set `tag_pattern` to `feature-*` - -### Clean develop tag if it's old: -1. Same as above -2. Set `tag_pattern` to `develop` - -## ๐Ÿ“ˆ Benefits - -1. **๐Ÿš€ Automatic**: No manual intervention required for normal workflow -2. **๐Ÿ’พ Space-efficient**: Prevents GHCR storage from growing indefinitely -3. **๐Ÿ”’ Safe**: Protected images are never accidentally deleted -4. **โš™๏ธ Configurable**: Can adjust retention policies as needed -5. **๐ŸŽฏ Targeted**: Can clean specific types of images when needed - -## ๐Ÿšจ Monitoring - -You can monitor the cleanup by: -1. Checking the Actions tab for cleanup workflow runs -2. Looking at your GHCR package page to see active images -3. The cleanup logs show exactly what was deleted - -## ๐Ÿ’ก Customization - -To adjust the cleanup behavior: - -### Change default retention period: -Edit `cleanup_images.yaml` line with `default: '7'` to your preferred number of days. - -### Change cleanup schedule: -Edit the cron expression in `cleanup_images.yaml`: -```yaml -schedule: - - cron: '0 2 * * 0' # Weekly on Sunday at 2 AM -``` - -### Add more protected patterns: -Modify the `hasProtectedTag` logic in the cleanup script to protect additional tag patterns. diff --git a/pyproject.toml b/pyproject.toml index 92ce35e..91bb9bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "git-metadata-extractor" -version = "0.1.0" +version = "1.0.0" description = "This project is designed to classify imaging software repositories and extract relevant information using AI models." readme = "README.md" requires-python = ">=3.9" @@ -41,6 +41,10 @@ dependencies = [ "rdflib==6.2.0", "rdflib-jsonld==0.6.2", "PyYAML==6.0.2", + "selenium==4.34.2", + "beautifulsoup4==4.13.4", + "aiohttp==3.12.15", + "uvicorn-worker==0.3.0" ] [project.urls] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index c05a0c7..0000000 --- a/requirements.txt +++ /dev/null @@ -1,13 +0,0 @@ -pydantic -python-dotenv -google-genai -repo-to-text -PyLD -rdflib -rdflib-jsonld -tiktoken -fastapi -uvicorn -gimie==0.7.2 -pyyaml -openai \ No newline at end of file diff --git a/src/api.py b/src/api.py index 6b4f6a6..432bb19 100644 --- a/src/api.py +++ b/src/api.py @@ -3,16 +3,19 @@ import os from .core.gimie_methods import extract_gimie from .core.models import convert_jsonld_to_pydantic, convert_pydantic_to_zod_form_dict -from .core.genai_model import llm_request_repo_infos +from .core.genai_model import llm_request_repo_infos, llm_request_userorg_infos +from .core.users_parser import parse_github_user +from .core.orgs_parser import parse_github_organization from .utils.utils import merge_jsonld +from pprint import pprint app = FastAPI() @app.get("/") def index(): - return {"title": "Hello, welcome to the Git Metadata Extractor v0.1.0. Gimie Version 0.7.2. "} + return {"title": f"Hello, welcome to the Git Metadata Extractor v0.2.0. Gimie Version 0.7.2. LLM Model {os.environ['MODEL']}"} @app.get("/v1/extract/json/{full_path:path}") async def extract(full_path:str): @@ -20,16 +23,14 @@ async def extract(full_path:str): jsonld_gimie_data = extract_gimie(full_path, format="json-ld") try: - llm_result = llm_request_repo_infos(str(full_path)) - except Exception as e: - raise HTTPException( - status_code=424, - detail=f"Error from LLM service: {e}" - ) + llm_result = await llm_request_repo_infos(str(full_path), output_format="json-ld", max_tokens=30000) + merged_results = merge_jsonld(jsonld_gimie_data, llm_result) + pydantic_data = convert_jsonld_to_pydantic(merged_results["@graph"]) - merged_results = merge_jsonld(jsonld_gimie_data, llm_result) + except Exception as e: - pydantic_data = convert_jsonld_to_pydantic(merged_results["@graph"]) + pydantic_data = convert_jsonld_to_pydantic(jsonld_gimie_data["@graph"]) + print(f"Warning: LLM service failed, using fallback data: {e}") zod_data = convert_pydantic_to_zod_form_dict(pydantic_data) @@ -37,12 +38,12 @@ async def extract(full_path:str): "output": zod_data} @app.get("/v1/extract/json-ld/{full_path:path}") -async def extract(full_path:str): +async def extract_jsonld(full_path:str): jsonld_gimie_data = extract_gimie(full_path, format="json-ld") try: - llm_result = llm_request_repo_infos(str(full_path)) + llm_result = await llm_request_repo_infos(str(full_path), max_tokens=20000) except Exception as e: raise HTTPException( status_code=424, @@ -53,26 +54,82 @@ async def extract(full_path:str): return {"link": full_path, "output": merged_results} + +@app.get("/v1/org/llm/json/{full_path:path}") +async def get_org_json(full_path: str): + + try: + org_metadata = parse_github_organization(full_path.split("/")[-1]) + + parsed_org_metadata = await llm_request_userorg_infos(org_metadata, item_type="org") + + org_metadata_dict = org_metadata.model_dump() + org_metadata_dict.update(parsed_org_metadata) + + except Exception as e: + raise HTTPException( + status_code=424, + detail=f"Error from Organization JSON service: {e}" + ) + + return {"link": full_path, + "output": org_metadata_dict} + +@app.get("/v1/user/llm/json/{full_path:path}") +async def get_user_json(full_path: str): + + try: + user_metadata = parse_github_user(full_path.split("/")[-1]) + + parsed_user_metadata = await llm_request_userorg_infos(user_metadata, item_type="user") + + user_metadata_dict = user_metadata.model_dump() + + user_metadata_dict.update(parsed_user_metadata) + + except Exception as e: + raise HTTPException( + status_code=424, + detail=f"Error from Get User service: {e}" + ) + + return {"link": full_path, + "output": user_metadata_dict} -@app.get("/v1/gimie/{full_path:path}") -async def gimie(full_path:str, - format:str = "json-ld"): +@app.get("/v1/repository/gimie/json-ld/{full_path:path}") +async def gimie(full_path:str): try: - gimie_output = extract_gimie(full_path, format=format) + gimie_output = extract_gimie(full_path, format="json-ld") except Exception as e: raise HTTPException( - status_code=424, #? - detail=f"Error from LLM service: {e}" + status_code=424, + detail=f"Error from Gimie service: {e}" ) return {"link": full_path, "output": gimie_output} -@app.get("/v1/llm/{full_path:path}") -async def llm(full_path:str): +@app.get("/v1/repository/llm/json-ld/{full_path:path}") +async def llm_jsonld(full_path:str): + + try: + llm_result = await llm_request_repo_infos(str(full_path), max_tokens=20000) + except Exception as e: + raise HTTPException( + status_code=424, + detail=f"Error from LLM service: {e}" + ) + + return {"link": full_path, + "output": llm_result} + +@app.get("/v1/repository/llm/json/{full_path:path}") +async def llm_json(full_path:str): + + jsonld_gimie_data = extract_gimie(full_path, format="json-ld") try: - llm_result = llm_request_repo_infos(str(full_path)) + llm_result = await llm_request_repo_infos(str(full_path), gimie_output=jsonld_gimie_data, output_format="json", max_tokens=20000) except Exception as e: raise HTTPException( status_code=424, diff --git a/src/core/genai_model.py b/src/core/genai_model.py index e4fcccd..d2e0969 100644 --- a/src/core/genai_model.py +++ b/src/core/genai_model.py @@ -1,16 +1,16 @@ import os import tempfile +import asyncio import subprocess import glob -import requests +import aiohttp import tiktoken import logging from dotenv import load_dotenv -from pprint import pprint -import openai +from openai import AsyncOpenAI -from .prompts import system_prompt_json -from .models import SoftwareSourceCode +from .prompts import system_prompt_json, system_prompt_user_content, system_prompt_org_content +from .models import SoftwareSourceCode, GitHubOrganization, GitHubUser from ..utils.utils import * from .verification import Verification @@ -21,6 +21,9 @@ MODEL = os.environ["MODEL"] PROVIDER = os.environ["PROVIDER"] +# Create async OpenAI client +async_openai_client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY")) + # Setup logger logger = logging.getLogger(__name__) @@ -50,6 +53,7 @@ def sort_files_by_priority(file_paths): """ priority_order = { # Priority 0: Documentation + ".cff":0, ".md": 0, ".txt": 0, ".html": 0, @@ -95,96 +99,150 @@ def store_combined_text(input_text, output_file): return output_file -def clone_repo(repo_url): +async def clone_repo(repo_url, temp_dir): """ - Clone a GitHub repository into a temporary directory. + Clone a GitHub repository into a temporary directory asynchronously. """ - with tempfile.TemporaryDirectory() as temp_dir: - logger.info(f"Cloning {repo_url} into {temp_dir}...") - try: - subprocess.run(["git", "clone", repo_url, temp_dir], check=True) + logger.info(f"Cloning {repo_url} into {temp_dir}...") + try: + process = await asyncio.create_subprocess_exec( + 'git', 'clone', '-c', 'core.symlinks=false', repo_url, temp_dir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + + if process.returncode == 0: logger.info("Repository cloned successfully.") return temp_dir - except subprocess.CalledProcessError as e: - logger.error(f"Failed to clone repository: {e}") + else: + logger.error(f"Failed to clone repository: {stderr.decode()}") return None + except Exception as e: + logger.error(f"Failed to clone repository: {e}") + return None + +async def run_repo_to_text(temp_dir): + """ + Run the repo-to-text command asynchronously. + """ + try: + process = await asyncio.create_subprocess_exec( + 'repo-to-text', + cwd=temp_dir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + + if process.returncode == 0: + logger.info("repo-to-text command completed successfully.") + return True + else: + logger.error(f"'repo-to-text' command failed: {stderr.decode()}") + return False + except Exception as e: + logger.error(f"'repo-to-text' command failed: {e}") + return False + +def sanitize_special_tokens(text): + """ + Remove special tokens using tiktoken encoding/decoding. + """ + encoding = tiktoken.get_encoding("cl100k_base") + + # Encode with disallowed_special=() to handle special tokens + # Then decode to get clean text + try: + tokens = encoding.encode(text, disallowed_special=()) + clean_text = encoding.decode(tokens) + return clean_text + except Exception as e: + logger.warning(f"Failed to sanitize with tiktoken: {e}") + # Fallback to simple regex cleanup + import re + return re.sub(r'<\|[^|]*\|>', '', text) -def llm_request_repo_infos(repo_url): +async def llm_request_repo_infos(repo_url, output_format="json-ld", gimie_output=None, max_tokens=40000): + """ + Async version of llm_request_repo_infos + """ # Clone the GitHub repository into a temporary folder with tempfile.TemporaryDirectory() as temp_dir: - logger.info(f"Cloning {repo_url} into {temp_dir}...") - try: - subprocess.run(["git", "clone", repo_url, temp_dir], check=True) - except subprocess.CalledProcessError as e: - logger.error(f"Failed to clone repository: {e}") + # Clone repository asynchronously + clone_result = await clone_repo(repo_url, temp_dir) + if not clone_result: return None - # Run the repo-to-text command in the repository directory - try: - subprocess.run(["repo-to-text"], cwd=temp_dir, check=True) - except subprocess.CalledProcessError as e: - logger.error(f"'repo-to-text' command failed: {e}") + # Run repo-to-text asynchronously + repo_to_text_success = await run_repo_to_text(temp_dir) + if not repo_to_text_success: return None input_text = combine_text_files(temp_dir) - input_text = reduce_input_size(input_text, max_tokens=80000) + input_text = sanitize_special_tokens(input_text) + input_text = reduce_input_size(input_text, max_tokens=max_tokens) + + if gimie_output: + input_text += "\n\n" + str(gimie_output) combined_file_path = os.path.join(temp_dir, "combined_repo.txt") store_combined_text(input_text, combined_file_path) - if PROVIDER == "openrouter": - response = get_openrouter_response(input_text, model=MODEL) + response = await get_openrouter_response_async(input_text, model=MODEL) elif PROVIDER == "openai": - response = get_openai_response(input_text, model=MODEL) + response = await get_openai_response_async(input_text, model=MODEL) else: logger.error("No provider provided") + return None - if response.status_code == 200: - try: - raw_result = response.json()["choices"][0]["message"]["content"] + try: + if PROVIDER == "openrouter": + raw_result = response["choices"][0]["message"]["content"] parsed_result = clean_json_string(raw_result) json_data = json.loads(parsed_result) - pprint(json_data) + elif PROVIDER == "openai": + json_data = response.choices[0].message.parsed + logger.info("Clean result from OpenAI response:") + json_data = json_data.model_dump(mode='json') - logger.info("Successfully parsed API response") + logger.info("Successfully JSON API response") - # Run verification before converting to JSON-LD - verifier = Verification(json_data) - verifier.run() - verifier.summary() + # Run verification before converting to JSON-LD + verifier = Verification(json_data) + verifier.run() + verifier.summary() - # Sanitize metadata before conversion - cleaned_json = verifier.sanitize_metadata() + cleaned_json = verifier.sanitize_metadata() - # TODO. This is hardcoded. Not good. - context_path = "src/files/json-ld-context.json" - # Now convert cleaned data to JSON-LD + context_path = "src/files/json-ld-context.json" + if output_format == "json-ld": return json_to_jsonLD(cleaned_json, context_path) - - except Exception as e: - logger.error(f"Error parsing response: {e}") + elif output_format == "json": + return cleaned_json + else: + logger.error(f"Unsupported output format: {output_format}") return None - else: - logger.error(f"API Error: {response.status_code} - {response.text}") - return None + except Exception as e: + logger.error(f"Error parsing response: {e}") + return None -def get_openrouter_response(input_text, model="google/gemini-2.5-flash", temperature=0.1): +async def get_openrouter_response_async(input_text, system_prompt=system_prompt_json, model="google/gemini-2.5-flash", temperature=0.2, schema=SoftwareSourceCode): """ - Get structured response from openrouter + Get structured response from openrouter asynchronously """ - # Prepare payload for OpenRouter API payload = { "model": model, "messages": [ - {"role": "system", "content": system_prompt_json}, + {"role": "system", "content": system_prompt}, {"role": "user", "content": input_text} ], "response_format": { "type": "json_schema", - "json_schema": SoftwareSourceCode.model_json_schema() + "json_schema": schema.model_json_schema() }, "temperature": temperature } @@ -194,39 +252,147 @@ def get_openrouter_response(input_text, model="google/gemini-2.5-flash", tempera "Content-Type": "application/json" } + timeout = aiohttp.ClientTimeout(total=300) # 5 minute timeout + + for attempt in range(3): + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(OPENROUTER_ENDPOINT, headers=headers, json=payload) as response: + logger.info(f"API response status: {response.status}") + if response.status == 200: + return await response.json() + else: + logger.error(f"API request failed with status {response.status}") + if attempt == 2: # Last attempt + return None + except aiohttp.ClientError as e: + logger.error(f"Request failed (attempt {attempt + 1}): {e}") + if attempt == 2: # Last attempt + return None + except asyncio.TimeoutError as e: + logger.error(f"Request timeout (attempt {attempt + 1}): {e}") + if attempt == 2: # Last attempt + return None + + return None - # Send request to OpenRouter +async def get_openai_response_async(prompt, system_prompt=system_prompt_json, model="gpt-4o", temperature=0.2, schema=SoftwareSourceCode): + """ + Get structured response from OpenAI API using SoftwareSourceCode schema asynchronously. + """ + try: + # Use the async OpenAI client + if model.split("-")[0] == "o3": + response = await async_openai_client.beta.chat.completions.parse( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt} + ], + response_format=convert_httpurl_to_str(schema) + ) + else: + response = await async_openai_client.beta.chat.completions.parse( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt} + ], + temperature=temperature, + response_format=convert_httpurl_to_str(schema) + ) - n = 3 - while n != 0: - try: - response = requests.post(OPENROUTER_ENDPOINT, headers=headers, json=payload) - logger.info(f"API response status: {response.status_code}") - n = 0 - except requests.exceptions.RequestException as e: - logger.error(f"Request failed: {e}") - n -= 1 - return None - - return response + return response + except Exception as e: + logger.error(f"OpenAI API error: {e}") + return None -def get_openai_response(prompt, model="gpt-4o", temperature=0.1): +async def llm_request_userorg_infos(metadata, item_type="user"): """ - Get structured response from OpenAI API using SoftwareSourceCode schema. + Async version of llm_request_userorg_infos """ + input_text = metadata.model_dump_json() + + if item_type == "user": + schema = GitHubUser + system_prompt = system_prompt_user_content + elif item_type == "org": + schema = GitHubOrganization + system_prompt = system_prompt_org_content + + if PROVIDER == "openrouter": + response = await get_openrouter_response_async(input_text, + system_prompt=system_prompt, + model=MODEL, + schema=schema) + elif PROVIDER == "openai": + response = await get_openai_response_async(input_text, + system_prompt=system_prompt, + model=MODEL, + schema=schema) + else: + logger.error("No provider provided") + return None + try: - response = openai.beta.chat.completions.parse( - model=model, - messages=[ - {"role": "system", "content": "You are a helpful assistant. Respond in JSON format."}, - {"role": "user", "content": prompt} - ], - temperature=temperature, - response_format=convert_httpurl_to_str(SoftwareSourceCode) - ) + if PROVIDER == "openrouter": + raw_result = response["choices"][0]["message"]["content"] + parsed_result = clean_json_string(raw_result) + json_data = json.loads(parsed_result) + elif PROVIDER == "openai": + json_data = response.choices[0].message.parsed + json_data = json_data.model_dump(mode='json') + else: + logger.error("Unknown provider") + return None + + logger.info("Successfully parsed API response") + return json_data + + except Exception as e: + logger.error(f"Error parsing response: {e}") + return None + +# Keep the synchronous versions for backward compatibility +def get_openrouter_response(input_text, system_prompt=system_prompt_json, model="google/gemini-2.5-flash", temperature=0.2, schema=SoftwareSourceCode): + """ + Synchronous wrapper for backward compatibility + """ + import asyncio + return asyncio.run(get_openrouter_response_async(input_text, system_prompt, model, temperature, schema)) + +def get_openai_response(prompt, system_prompt=system_prompt_json, model="gpt-4o", temperature=0.2, schema=SoftwareSourceCode): + """ + Synchronous wrapper for backward compatibility + """ + from openai import OpenAI + + sync_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) + + try: + if model.split("-")[0] == "o3": + response = sync_client.beta.chat.completions.parse( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt} + ], + response_format=convert_httpurl_to_str(schema) + ) + else: + response = sync_client.beta.chat.completions.parse( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt} + ], + temperature=temperature, + response_format=convert_httpurl_to_str(schema) + ) return response + except Exception as e: logger.error(f"OpenAI API error: {e}") - return None \ No newline at end of file + return None diff --git a/src/core/gimie_methods.py b/src/core/gimie_methods.py index b31c178..56dd183 100644 --- a/src/core/gimie_methods.py +++ b/src/core/gimie_methods.py @@ -3,15 +3,16 @@ def extract_gimie(full_path: str, format: str = "json-ld"): """ - Extracts the GIMIE project from the given path. - + Extracts the GIMIE project from the given URL. + Args: - full_path (str): The full path to the GIMIE project. + full_path (str): The full path to the URL. format (str): The format to serialize the graph. Default is 'json-ld', or 'ttl'. Returns: Project: The GIMIE project object. """ + print(full_path) proj = Project(full_path) diff --git a/src/core/models.py b/src/core/models.py index 7c4aec6..34049ca 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -57,6 +57,62 @@ class Image(BaseModel): contentUrl: HttpUrl = None keywords: ImageKeyword = ImageKeyword.ILLUSTRATIVE_IMAGE + +class Discipline(str, Enum): + SOCIAL_SCIENCES = "Social sciences" + ANTHROPOLOGY = "Anthropology" + COMMUNICATION_STUDIES = "Communication studies" + EDUCATION = "Education" + LINGUISTICS = "Linguistics" + RESEARCH = "Research" + SOCIOLOGY = "Sociology" + GEOGRAPHY = "Geography" + PSYCHOLOGY = "Psychology" + POLITICS = "Politics" + ECONOMICS = "Economics" + APPLIED_SCIENCES = "Applied sciences" + HEALTH_SCIENCES = "Health sciences" + ELECTRICAL_ENGINEERING = "Electrical engineering" + CHEMICAL_ENGINEERING = "Chemical engineering" + CIVIL_ENGINEERING = "Civil engineering" + ARCHITECTURE = "Architecture" + COMPUTER_ENGINEERING = "Computer engineering" + ENERGY_ENGINEERING = "Energy engineering" + MILITARY_SCIENCE = "Military science" + INDUSTRIAL_PRODUCTION_ENGINEERING = "Industrial and production engineering" + MECHANICAL_ENGINEERING = "Mechanical engineering" + BIOLOGICAL_ENGINEERING = "Biological engineering" + ENVIRONMENTAL_SCIENCE = "Environmental science" + SYSTEMS_SCIENCE_ENGINEERING = "Systems science and engineering" + INFORMATION_ENGINEERING = "Information engineering" + AGRICULTURAL_FOOD_SCIENCES = "Agricultural and food sciences" + BUSINESS = "Business" + HUMANITIES = "Humanities" + HISTORY = "History" + LITERATURE = "Literature" + ART = "Art" + RELIGION = "Religion" + PHILOSOPHY = "Philosophy" + LAW = "Law" + FORMAL_SCIENCES = "Formal sciences" + MATHEMATICS = "Mathematics" + LOGIC = "Logic" + STATISTICS = "Statistics" + THEORETICAL_COMPUTER_SCIENCE = "Theoretical computer science" + NATURAL_SCIENCES = "Natural sciences" + PHYSICS = "Physics" + ASTRONOMY = "Astronomy" + BIOLOGY = "Biology" + CHEMISTRY = "Chemistry" + EARTH_SCIENCE = "Earth science" + +class RepositoryType(str, Enum): + SOFTWARE = "software" + EDUCATIONAL_RESOURCE = "educational resource" + DOCUMENTATION = "documentation" + DATA = "data" + OTHER = "other" + class SoftwareSourceCode(BaseModel): name: Optional[str] = None applicationCategory: Optional[List[str]] = None @@ -74,6 +130,7 @@ class SoftwareSourceCode(BaseModel): license: Annotated[str, StringConstraints(pattern=r"spdx\.org.*")] = None author: List[Union[Person, Organization]] = None relatedToOrganization: Optional[List[str]] = None + relatedToOrganizationJustification: Optional[List[str]] = None operatingSystem: Optional[List[str]] = None programmingLanguage: Optional[List[str]] = None softwareRequirements: Optional[List[str]] = None @@ -94,6 +151,35 @@ class SoftwareSourceCode(BaseModel): imagingModality: Optional[List[str]] = None fairLevel: Optional[str] = None graph: Optional[str] = None + discipline: Optional[List[Discipline]] = None + disciplineJustification: Optional[List[str]] = None + repositoryType: Optional[RepositoryType] = None + repositoryTypeJustification: Optional[List[str]] = None + +############################################################ +# +# Github Users and Organizations Models +# +############################################################ + +class GitHubOrganization(BaseModel): + name: Optional[str] = None + organizationType: Optional[str] = None + organizationTypeJustification: Optional[str] = None + description: Optional[str] = None + relatedToOrganization: Optional[List[str]] = None + relatedToOrganizationJustification: Optional[List[str]] = None + discipline: Optional[List[Discipline]] = None + disciplineJustification: Optional[List[str]] = None + +class GitHubUser(BaseModel): + name: Optional[str] = None + relatedToOrganization: Optional[List[str]] = None + relatedToOrganizationJustification: Optional[List[str]] = None + discipline: Optional[List[Discipline]] = None + disciplineJustification: Optional[List[str]] = None + position: Optional[List[str]] = None + positionJustification: Optional[List[str]] = None ############################################################ diff --git a/src/core/orgs_parser.py b/src/core/orgs_parser.py new file mode 100644 index 0000000..3418a01 --- /dev/null +++ b/src/core/orgs_parser.py @@ -0,0 +1,396 @@ +import requests +import json +import base64 +from typing import Dict, List, Optional, Any +from pydantic import BaseModel, Field, validator +from datetime import datetime +import os +from dotenv import load_dotenv + +load_dotenv() + +GITHUB_TOKEN = os.environ["GITHUB_TOKEN"] + + +class GitHubOrganizationMetadata(BaseModel): + """Pydantic model to store GitHub organization metadata with validation""" + login: str = Field(..., description="Organization username/login") + name: Optional[str] = Field(None, description="Organization's display name") + description: Optional[str] = Field(None, description="Organization's description") + email: Optional[str] = Field(None, description="Organization's public email") + location: Optional[str] = Field(None, description="Organization's location") + company: Optional[str] = Field(None, description="Organization's company") + blog: Optional[str] = Field(None, description="Organization's blog URL") + twitter_username: Optional[str] = Field(None, description="Twitter username") + public_repos: int = Field(..., ge=0, description="Number of public repositories") + public_gists: int = Field(..., ge=0, description="Number of public gists") + followers: int = Field(..., ge=0, description="Number of followers") + following: int = Field(..., ge=0, description="Number of users following") + created_at: str = Field(..., description="Organization creation date") + updated_at: str = Field(..., description="Last organization update date") + avatar_url: str = Field(..., description="Avatar image URL") + html_url: str = Field(..., description="GitHub organization URL") + gravatar_id: Optional[str] = Field(None, description="Gravatar ID") + type: str = Field(..., description="Type (should be 'Organization')") + node_id: str = Field(..., description="GraphQL node ID") + url: str = Field(..., description="API URL") + repos_url: str = Field(..., description="Repositories API URL") + events_url: str = Field(..., description="Events API URL") + hooks_url: str = Field(..., description="Hooks API URL") + issues_url: str = Field(..., description="Issues API URL") + members_url: str = Field(..., description="Members API URL") + + # Additional metadata + public_members: List[str] = Field(default_factory=list, description="Public members") + repositories: List[str] = Field(default_factory=list, description="Repository names") + teams: List[str] = Field(default_factory=list, description="Team names") + readme_url: Optional[str] = Field(None, description="Profile README URL if exists") + readme_content: Optional[str] = Field(None, description="Profile README content if exists") + social_accounts: List[Dict[str, str]] = Field(default_factory=list, description="Social media accounts") + pinned_repositories: List[Dict[str, Any]] = Field(default_factory=list, description="Pinned repositories") + + @validator('email') + def validate_email(cls, v): + """Basic email validation""" + if v is not None and v != "" and '@' not in v: + raise ValueError('Invalid email format') + return v + + class Config: + """Pydantic configuration""" + validate_assignment = True + extra = "forbid" + + +class GitHubOrganizationsParser: + """Parser for GitHub organization metadata using REST and GraphQL APIs""" + + def __init__(self): + """Initialize the parser with GitHub token for higher rate limits""" + self.github_token = GITHUB_TOKEN + self.rest_base_url = "https://api.github.com" + self.graphql_url = "https://api.github.com/graphql" + + self.headers = { + "Accept": "application/vnd.github.v3+json", + "User-Agent": "GitHubOrganizationsParser/1.0" + } + + if self.github_token: + self.headers["Authorization"] = f"token {self.github_token}" + + def get_organization_metadata(self, org_name: str) -> GitHubOrganizationMetadata: + """ + Retrieve comprehensive organization metadata from GitHub + + Args: + org_name: GitHub organization name + + Returns: + GitHubOrganizationMetadata object with all available organization information + + Raises: + requests.RequestException: If API calls fail + ValueError: If organization not found + """ + # Get basic organization data from REST API + rest_data = self._get_rest_organization_data(org_name) + + # Get extended data from GraphQL API (social accounts and pinned repos) + graphql_data = self._get_graphql_organization_data(org_name) + + # Get public members + public_members = self._get_organization_public_members(org_name) + + # Get repositories (limited to first 100 for performance) + repositories = self._get_organization_repositories(org_name) + + # Get teams (if accessible) + teams = self._get_organization_teams(org_name) + + # Check for README and get content + readme_data = self._get_organization_readme(org_name) + + # Combine all data and create Pydantic model + org_data = { + "login": rest_data["login"], + "name": rest_data.get("name"), + "description": rest_data.get("description"), + "email": rest_data.get("email"), + "location": rest_data.get("location"), + "company": rest_data.get("company"), + "blog": rest_data.get("blog"), + "twitter_username": rest_data.get("twitter_username"), + "public_repos": rest_data["public_repos"], + "public_gists": rest_data["public_gists"], + "followers": rest_data["followers"], + "following": rest_data["following"], + "created_at": rest_data["created_at"], + "updated_at": rest_data["updated_at"], + "avatar_url": rest_data["avatar_url"], + "html_url": rest_data["html_url"], + "gravatar_id": rest_data.get("gravatar_id"), + "type": rest_data["type"], + "node_id": rest_data["node_id"], + "url": rest_data["url"], + "repos_url": rest_data["repos_url"], + "events_url": rest_data["events_url"], + "hooks_url": rest_data["hooks_url"], + "issues_url": rest_data["issues_url"], + "members_url": rest_data["members_url"], + "public_members": public_members, + "repositories": repositories, + "teams": teams, + "readme_url": readme_data.get("url"), + "readme_content": readme_data.get("content"), + "social_accounts": graphql_data.get("social_accounts", []), + "pinned_repositories": graphql_data.get("pinned_repositories", []) + } + + return GitHubOrganizationMetadata(**org_data) + + def _get_rest_organization_data(self, org_name: str) -> Dict[str, Any]: + """Get basic organization data from REST API""" + url = f"{self.rest_base_url}/orgs/{org_name}" + response = requests.get(url, headers=self.headers) + + if response.status_code == 404: + raise ValueError(f"Organization '{org_name}' not found") + + response.raise_for_status() + return response.json() + + def _get_graphql_organization_data(self, org_name: str) -> Dict[str, Any]: + """Get extended organization data from GraphQL API including social accounts and pinned repos""" + query = """ + query($org_name: String!) { + organization(login: $org_name) { + socialAccounts(first: 10) { + nodes { + provider + url + displayName + } + } + pinnedItems(first: 6, types: REPOSITORY) { + nodes { + ... on Repository { + name + description + url + stargazerCount + forkCount + primaryLanguage { + name + color + } + isPrivate + updatedAt + } + } + } + } + } + """ + + variables = {"org_name": org_name} + + payload = { + "query": query, + "variables": variables + } + + headers = self.headers.copy() + headers["Content-Type"] = "application/json" + + response = requests.post( + self.graphql_url, + headers=headers, + data=json.dumps(payload) + ) + + if response.status_code != 200: + return {"social_accounts": [], "pinned_repositories": []} + + data = response.json() + + if "errors" in data: + return {"social_accounts": [], "pinned_repositories": []} + + org_data = data["data"]["organization"] + if not org_data: + return {"social_accounts": [], "pinned_repositories": []} + + # Extract social accounts + social_accounts = [] + if org_data.get("socialAccounts") and org_data["socialAccounts"].get("nodes"): + for account in org_data["socialAccounts"]["nodes"]: + social_accounts.append({ + "provider": account["provider"], + "url": account["url"], + "display_name": account.get("displayName", "") + }) + + # Extract pinned repositories + pinned_repositories = [] + if org_data.get("pinnedItems") and org_data["pinnedItems"].get("nodes"): + for repo in org_data["pinnedItems"]["nodes"]: + pinned_repo = { + "name": repo["name"], + "description": repo.get("description"), + "url": repo["url"], + "stargazer_count": repo["stargazerCount"], + "fork_count": repo["forkCount"], + "is_private": repo["isPrivate"], + "updated_at": repo["updatedAt"] + } + + if repo.get("primaryLanguage"): + pinned_repo["primary_language"] = { + "name": repo["primaryLanguage"]["name"], + "color": repo["primaryLanguage"]["color"] + } + + pinned_repositories.append(pinned_repo) + + return { + "social_accounts": social_accounts, + "pinned_repositories": pinned_repositories + } + + def _get_organization_public_members(self, org_name: str) -> List[str]: + """Get organization's public members""" + url = f"{self.rest_base_url}/orgs/{org_name}/public_members" + response = requests.get(url, headers=self.headers) + + if response.status_code != 200: + return [] + + members_data = response.json() + return [member["login"] for member in members_data] + + def _get_organization_repositories(self, org_name: str, limit: int = 100) -> List[str]: + """Get organization's repositories (limited for performance)""" + url = f"{self.rest_base_url}/orgs/{org_name}/repos" + params = {"per_page": limit, "sort": "updated"} + response = requests.get(url, headers=self.headers, params=params) + + if response.status_code != 200: + return [] + + repos_data = response.json() + return [repo["name"] for repo in repos_data] + + def _get_organization_teams(self, org_name: str) -> List[str]: + """Get organization's teams (requires organization membership)""" + url = f"{self.rest_base_url}/orgs/{org_name}/teams" + response = requests.get(url, headers=self.headers) + + if response.status_code != 200: + # This is expected for external users who can't see teams + return [] + + teams_data = response.json() + return [team["name"] for team in teams_data] + + def _get_organization_readme(self, org_name: str) -> Dict[str, Optional[str]]: + """Get organization's README URL and content if it exists""" + # Organizations can have a README in a special repository named .github + # Try to get README from the .github repository + readme_paths = [ + "profile/README.md", + "README.md" + ] + + for readme_path in readme_paths: + url = f"{self.rest_base_url}/repos/{org_name}/.github/contents/{readme_path}" + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + readme_data = response.json() + content = self._decode_readme_content(readme_data.get("content", "")) + return { + "url": f"https://github.com/{org_name}/.github/blob/main/{readme_path}", + "content": content + } + + # Try master branch as fallback + for readme_path in readme_paths: + url = f"{self.rest_base_url}/repos/{org_name}/.github/contents/{readme_path}" + params = {"ref": "master"} + response = requests.get(url, headers=self.headers, params=params) + + if response.status_code == 200: + readme_data = response.json() + content = self._decode_readme_content(readme_data.get("content", "")) + return { + "url": f"https://github.com/{org_name}/.github/blob/master/{readme_path}", + "content": content + } + + return {"url": None, "content": None} + + def _decode_readme_content(self, encoded_content: str) -> Optional[str]: + """Decode base64 encoded README content""" + if not encoded_content: + return None + + try: + # GitHub API returns content in base64 format + decoded_bytes = base64.b64decode(encoded_content) + return decoded_bytes.decode('utf-8') + except Exception as e: + print(f"Warning: Could not decode README content: {e}") + return None + + +def is_it_github_organization(org_name: str) -> bool: + """ + Check if the given name is a valid GitHub organization. + + Args: + org_name: GitHub organization name to check + + Returns: + True if organization exists, False otherwise + """ + parser = GitHubOrganizationsParser() + try: + parser.get_organization_metadata(org_name) + return True + except ValueError: + return False + except requests.RequestException as e: + print(f"API Error: {e}") + return False + + +def parse_github_organization(org_name: str) -> GitHubOrganizationMetadata: + """ + Parse GitHub organization metadata + + Args: + org_name: GitHub organization name + + Returns: + GitHubOrganizationMetadata object with all available information + """ + parser = GitHubOrganizationsParser() + + try: + # Get organization metadata + org_metadata = parser.get_organization_metadata(org_name) + + + # Export to JSON + # print("\nJSON representation:") + # print(json.dumps(org_metadata.dict(), indent=2)) + + return org_metadata + + except ValueError as e: + print(f"Error: {e}") + raise + except requests.RequestException as e: + print(f"API Error: {e}") + raise \ No newline at end of file diff --git a/src/core/prompts.py b/src/core/prompts.py index 2a047b1..560fba7 100644 --- a/src/core/prompts.py +++ b/src/core/prompts.py @@ -36,6 +36,7 @@ - `orcidId` (valid URL, **optional**) - `affiliation` (list of strings, **optional**): Institutions the author is affiliated with. Do not mention Imaging Plaza unless is explicity mentioned. - `relatedToOrganization` (list of strings, **optional**): Institutions associated with the software. Do not mention Imaging Plaza unless is explicity mentioned. +- `relatedToOrganizationJustification` (list of strings, **optional**): Justification for the related organizations. - `softwareRequirements` (list of strings, **optional**): Dependencies or prerequisites for running the software. - `operatingSystem` (list of strings, **optional**): Compatible operating systems. Use only Windows, Linux, MacOS, or Other. - `programmingLanguage` (list of strings, **optional**): Programming languages used in the software. @@ -92,18 +93,60 @@ - `hasExecutableInstructions` (string, **optional**): Any exectuable instructions related to the software. This should point to an URL where the installation is explained. If this is the README file, please make the full URL. - `readme` (valid URL, **optional**): README url of the software (at the root of the repo) - `imagingModality (list of strings, **optional**): imaging modalities accepted by the software. +- `discipline` (string, **optional**): Scientific discipline the software belongs to. Base your response on the README and other documentation files content. +- `disciplineJustification` (list of strings, **optional**): Justification for the discipline classification. +- `repositoryType` (string, **optional**): Type of repository (e.g., software, educational resource, documentation, data, other). +- `respositoryTypeJustification` (list of strings, **optional**): Justification for the repository type classification. + +PLEASE PROVIDE THE OUTPUT IN JSON FORMAT ONLY, WITHOUT ANY EXPLANATION OR ADDITIONAL TEXT. ALIGN THE RESPONSE TO THE SCHEMA SPECIFICATION. +""" + + + +system_prompt_user_content = """ +You are a helpful assistant, expert in academic organizations and open source software development. +Please parse this information extracted from a GITHUB user profile and fill the json schema provided. +Do not make new fields if they are not in the schema. + +Also, please add EPFL to relatedToOrganizations if the person is affiliated with any EPFL lab or center. +- Check for github organizations related to an institution, companies, universities, or research centers. +- Include also the offices, units, labs or departments within the organization or company. These are usually reflected in individual github organizations. +- Pay attentions to the organizations in github, some of them reflect the units or departments and not the main institution, add boths. +- Sometimes an organization can guide you to identify the acronym of the institution, company or university. And use that to discover the affiliation to a specific team or center. +- Add as many relatedOrganizations as you can find, but do not add the user name as a related organization. +- Justify the response by providing the relatedToOrganizationJustification field. +- Try to write the organizations name correctly, with the correct capitalization and spelling. + +On the other hand, always add related Disciplines and justify the response in a common field. + +Respect the schema provided and do not add new fields. +""" + -When dealing with Organization pay attention to -- -- -- +system_prompt_org_content = """ +Please parse this information extracted from a GITHUB organization profile and fill the json schema provided. +Do not make new fields if they are not in the schema. -When parsing Persons note: -- -- -- +๐Ÿ“Œ **Schema Specification for GitHub Organization:** +- `name` (string, **optional**): Name of the GitHub organization. +- `organizationType` (string, **optional**): Type of organization (e.g., "University", "Research Institute", "Company", "Non-profit", "Government", "Laboratory", "Other"). +- `organizationTypeJustification` (string, **optional**): Justification for the organization type classification. +- `description` (string, **optional**): Description of the organization from their GitHub profile. +- `relatedToOrganization` (list of strings, **optional**): Parent institutions, companies, universities, or research centers that this organization is affiliated with. Do not add its own name. +- `relatedToOrganizationJustification` (list of strings, **optional**): Justification for each related organization identified. +- `discipline` (list of objects, **optional**): Scientific disciplines or fields related to this organization's work. +- `disciplineJustification` (list of strings, **optional**): Justification for the discipline classification. +๐Ÿ” **Instructions:** +1. Analyze the GitHub organization profile information provided. +2. Identify the organization type based on their description, repositories, and activities. +3. Look for connections to parent institutions - if it's a lab, identify the university; if it's a department, identify the company. +4. Add EPFL to relatedToOrganization if the organization is affiliated with any EPFL lab, center, or department. +5. Examine the organization's repositories and activities to determine relevant scientific disciplines. +6. Pay attention to acronyms and abbreviations that might indicate institutional affiliations. +7. Use correct capitalization and spelling for organization names. +8. Provide clear justifications for your classifications. PLEASE PROVIDE THE OUTPUT IN JSON FORMAT ONLY, WITHOUT ANY EXPLANATION OR ADDITIONAL TEXT. ALIGN THE RESPONSE TO THE SCHEMA SPECIFICATION. """ diff --git a/src/core/users_parser.py b/src/core/users_parser.py new file mode 100644 index 0000000..5c96e16 --- /dev/null +++ b/src/core/users_parser.py @@ -0,0 +1,871 @@ +import requests +import json +import re +import base64 +from typing import Dict, List, Optional, Any +from pydantic import BaseModel, Field, validator +from datetime import datetime +import os +from dotenv import load_dotenv + +from bs4 import BeautifulSoup +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.common.desired_capabilities import DesiredCapabilities +from selenium.webdriver.firefox.options import Options + +import time + +load_dotenv() + +GITHUB_TOKEN = os.environ["GITHUB_TOKEN"] +SELENIUM_REMOTE_URL = os.environ.get("SELENIUM_REMOTE_URL", "http://localhost:4444") + + +class ORCIDEmployment(BaseModel): + """ORCID employment entry""" + organization: str = Field(..., description="Organization name") + role: Optional[str] = Field(None, description="Job title/role") + start_date: Optional[str] = Field(None, description="Start date") + end_date: Optional[str] = Field(None, description="End date") + location: Optional[str] = Field(None, description="Location") + duration_years: Optional[float] = Field(None, description="Duration in years") + + +class ORCIDEducation(BaseModel): + """ORCID education entry""" + organization: str = Field(..., description="Educational institution") + degree: Optional[str] = Field(None, description="Degree or qualification") + start_date: Optional[str] = Field(None, description="Start date") + end_date: Optional[str] = Field(None, description="End date") + location: Optional[str] = Field(None, description="Location") + duration_years: Optional[float] = Field(None, description="Duration in years") + + +class ORCIDActivities(BaseModel): + """ORCID activities data""" + employment: List[ORCIDEmployment] = Field(default_factory=list, description="Employment history") + education: List[ORCIDEducation] = Field(default_factory=list, description="Education history") + works_count: Optional[int] = Field(None, description="Number of works/publications") + peer_reviews_count: Optional[int] = Field(None, description="Number of peer reviews") + orcid_content: Optional[str] = Field(None, description="Parsed ORCID Activities content as Markdown") + orcid_format: Optional[str] = Field(default="markdown", description="Format of orcid_content") + + +class GitHubUserMetadata(BaseModel): + """Pydantic model to store GitHub user metadata with validation""" + login: str = Field(..., description="GitHub username") + name: Optional[str] = Field(None, description="User's display name") + bio: Optional[str] = Field(None, description="User's bio") + email: Optional[str] = Field(None, description="User's public email") + location: Optional[str] = Field(None, description="User's location") + company: Optional[str] = Field(None, description="User's company") + blog: Optional[str] = Field(None, description="User's blog URL") + twitter_username: Optional[str] = Field(None, description="Twitter username") + public_repos: int = Field(..., ge=0, description="Number of public repositories") + public_gists: int = Field(..., ge=0, description="Number of public gists") + followers: int = Field(..., ge=0, description="Number of followers") + following: int = Field(..., ge=0, description="Number of users following") + created_at: str = Field(..., description="Account creation date") + updated_at: str = Field(..., description="Last profile update date") + avatar_url: str = Field(..., description="Avatar image URL") + html_url: str = Field(..., description="GitHub profile URL") + orcid: Optional[str] = Field(None, description="ORCID identifier") + orcid_activities: Optional[ORCIDActivities] = Field(None, description="ORCID activities data") + organizations: List[str] = Field(default_factory=list, description="Public organizations") + social_accounts: List[Dict[str, str]] = Field(default_factory=list, description="Social media accounts") + readme_url: Optional[str] = Field(None, description="Profile README URL if exists") + readme_content: Optional[str] = Field(None, description="Profile README content if exists") + + @validator('orcid') + def validate_orcid(cls, v): + """Validate ORCID format""" + if v is not None: + orcid_pattern = r'^\d{4}-\d{4}-\d{4}-\d{3}[\dX]$' + if not re.match(orcid_pattern, v): + raise ValueError('Invalid ORCID format') + return v + + @validator('email') + def validate_email(cls, v): + """Basic email validation""" + if v is not None and '@' not in v: + raise ValueError('Invalid email format') + return v + + class Config: + """Pydantic configuration""" + validate_assignment = True + extra = "forbid" + + +class GitHubUsersParser: + """Parser for GitHub user metadata using REST and GraphQL APIs""" + + def __init__(self): + """ + Initialize the parser with optional GitHub token for higher rate limits + + """ + self.github_token = GITHUB_TOKEN + self.rest_base_url = "https://api.github.com" + self.graphql_url = "https://api.github.com/graphql" + + self.headers = { + "Accept": "application/vnd.github.v3+json", + "User-Agent": "GitHubUsersParser/1.0" + } + + if self.github_token: + self.headers["Authorization"] = f"token {self.github_token}" + + def get_user_metadata(self, username: str) -> GitHubUserMetadata: + """ + Retrieve comprehensive user metadata from GitHub + + Args: + username: GitHub username + + Returns: + GitHubUserMetadata object with all available user information + + Raises: + requests.RequestException: If API calls fail + ValueError: If user not found + """ + # Get basic user data from REST API + rest_data = self._get_rest_user_data(username) + + # Get extended data from GraphQL API (social accounts) + graphql_data = self._get_graphql_user_data(username) + + # Get organizations + organizations = self._get_user_organizations(username) + + # Check for README and get content + readme_data = self._get_user_readme(username) + + # Scrape ORCID from profile page + orcid = self._scrape_orcid_from_profile(username) + + # Get ORCID activities if ORCID is found + orcid_activities = None + if orcid: + orcid_activities = self._scrape_orcid_activities(orcid) + + # Combine all data and create Pydantic model + user_data = { + "login": rest_data["login"], + "name": rest_data.get("name"), + "bio": rest_data.get("bio"), + "email": rest_data.get("email"), + "location": rest_data.get("location"), + "company": rest_data.get("company"), + "blog": rest_data.get("blog"), + "twitter_username": rest_data.get("twitter_username"), + "public_repos": rest_data["public_repos"], + "public_gists": rest_data["public_gists"], + "followers": rest_data["followers"], + "following": rest_data["following"], + "created_at": rest_data["created_at"], + "updated_at": rest_data["updated_at"], + "avatar_url": rest_data["avatar_url"], + "html_url": rest_data["html_url"], + "orcid": orcid, + "orcid_activities": orcid_activities, + "organizations": organizations, + "social_accounts": graphql_data.get("social_accounts", []), + "readme_url": readme_data.get("url"), + "readme_content": readme_data.get("content") + } + + return GitHubUserMetadata(**user_data) + + def _scrape_orcid_from_profile(self, username: str) -> Optional[str]: + """ + Scrape ORCID from GitHub profile page + + Args: + username: GitHub username + + Returns: + ORCID ID if found, None otherwise + """ + try: + profile_url = f"https://github.com/{username}" + + # Use a browser-like user agent to avoid blocking + scraping_headers = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + } + + response = requests.get(profile_url, headers=scraping_headers, timeout=10) + + if response.status_code != 200: + return None + + soup = BeautifulSoup(response.content, 'html.parser') + + # Look for ORCID links in social links section + # Target the specific element structure you mentioned + orcid_links = soup.find_all('a', href=re.compile(r'https://orcid\.org/\d{4}-\d{4}-\d{4}-\d{3}[\dX]')) + + if orcid_links: + orcid_url = orcid_links[0]['href'] + # Extract ORCID ID from URL + orcid_match = re.search(r'(\d{4}-\d{4}-\d{4}-\d{3}[\dX])', orcid_url) + if orcid_match: + return orcid_match.group(1) + + # Alternative: Look in all text content for ORCID patterns + page_text = soup.get_text() + orcid_patterns = [ + r'https://orcid\.org/(\d{4}-\d{4}-\d{4}-\d{3}[\dX])', + r'orcid\.org/(\d{4}-\d{4}-\d{4}-\d{3}[\dX])', + r'\b(\d{4}-\d{4}-\d{4}-\d{3}[\dX])\b' + ] + + for pattern in orcid_patterns: + match = re.search(pattern, page_text) + if match: + return match.group(1) + + return None + + except Exception as e: + print(f"Warning: Could not scrape ORCID from profile: {e}") + return None + + def _scrape_orcid_activities(self, orcid_id: str) -> Optional[ORCIDActivities]: + """ + Scrape activities from ORCID profile page using Selenium + + Args: + orcid_id: ORCID identifier (e.g., "0000-0002-8076-2034") + + Returns: + ORCIDActivities object with employment and education data + """ + driver = None + try: + orcid_url = f"https://orcid.org/{orcid_id}" + + options = Options() + options.headless = True + options.add_argument("--no-sandbox") + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--width=1920") + options.add_argument("--height=1080") + options.add_argument("--user-agent=Mozilla/5.0 (X11; Linux x86_64; rv:140.0) Gecko/20100101 Firefox/140.0") + + # Set Firefox capabilities + capabilities = DesiredCapabilities.FIREFOX.copy() + capabilities['browserName'] = 'firefox' + + driver = webdriver.Remote( + command_executor=SELENIUM_REMOTE_URL, + options=options, + ) + driver.get(orcid_url) + + # Wait for the page to load + WebDriverWait(driver, 10).until( + EC.presence_of_element_located((By.TAG_NAME, "body")) + ) + + # Wait a bit more for dynamic content to load + time.sleep(3) + + # Get the page source and parse with BeautifulSoup + html_content = driver.page_source + soup = BeautifulSoup(html_content, 'html.parser') + + # Extract raw HTML from Activities section + activities_html = self._extract_activities_html(soup) + + # Extract employment data + employment_list = self._extract_employment_from_orcid_selenium(soup) + + # Extract education data + education_list = self._extract_education_from_orcid_selenium(soup) + + # Extract activity counts + works_count = self._extract_works_count_selenium(soup) + peer_reviews_count = self._extract_peer_reviews_count_selenium(soup) + + return ORCIDActivities( + employment=employment_list, + education=education_list, + works_count=works_count, + peer_reviews_count=peer_reviews_count, + orcid_content=activities_html + ) + + except Exception as e: + print(f"Warning: Could not scrape ORCID activities: {e}") + return None + finally: + if driver: + driver.quit() + + def _extract_employment_from_orcid_selenium(self, soup: BeautifulSoup) -> List[ORCIDEmployment]: + """Extract employment information from ORCID page using Selenium-rendered HTML""" + employment_list = [] + + try: + # Look for employment section + employment_section = soup.find('section', {'id': 'affiliations'}) + if not employment_section: + print("Warning: Employment section not found") + return employment_list + + # Find employment entries - they might be in different containers + employment_containers = employment_section.find_all(['app-affiliation-stack-group', 'div'], + class_=re.compile(r'affiliation|employment')) + + if not employment_containers: + # Try alternative selectors + employment_containers = employment_section.find_all('div', + string=re.compile(r'\d{4}')) + + print(f"Found {len(employment_containers)} employment containers") + + for container in employment_containers: + try: + # Extract text content + text_content = container.get_text(separator=' ', strip=True) + + # Skip if empty or too short + if len(text_content) < 10: + continue + + # Extract organization name (usually the first substantial text) + organization = self._extract_organization_name(text_content) + + # Extract dates + start_date, end_date = self._extract_dates_from_text(text_content) + + # Extract role/title + role = self._extract_role_from_text(text_content) + + # Extract location + location = self._extract_location_from_text(text_content) + + # Calculate duration + duration_years = self._calculate_duration(start_date, end_date) + + # Only add if we have at least an organization + if organization: + employment_list.append(ORCIDEmployment( + organization=organization, + role=role, + start_date=start_date, + end_date=end_date, + location=location, + duration_years=duration_years + )) + print(f"Added employment: {organization}") + + except Exception as e: + print(f"Warning: Could not parse employment entry: {e}") + continue + + except Exception as e: + print(f"Warning: Could not extract employment data: {e}") + + return employment_list + + def _extract_education_from_orcid_selenium(self, soup: BeautifulSoup) -> List[ORCIDEducation]: + """Extract education information from ORCID page using Selenium-rendered HTML""" + education_list = [] + + try: + # Look for education section + education_section = soup.find('section', {'id': 'education-and-qualification'}) + if not education_section: + print("Warning: Education section not found") + return education_list + + # Find education entries + education_containers = education_section.find_all(['app-affiliation-stack-group', 'div'], + class_=re.compile(r'affiliation|education')) + + if not education_containers: + education_containers = education_section.find_all('div', + string=re.compile(r'\d{4}')) + + print(f"Found {len(education_containers)} education containers") + + for container in education_containers: + try: + text_content = container.get_text(separator=' ', strip=True) + + if len(text_content) < 10: + continue + + organization = self._extract_organization_name(text_content) + start_date, end_date = self._extract_dates_from_text(text_content) + degree = self._extract_degree_from_text(text_content) + location = self._extract_location_from_text(text_content) + duration_years = self._calculate_duration(start_date, end_date) + + if organization: + education_list.append(ORCIDEducation( + organization=organization, + degree=degree, + start_date=start_date, + end_date=end_date, + location=location, + duration_years=duration_years + )) + print(f"Added education: {organization}") + + except Exception as e: + print(f"Warning: Could not parse education entry: {e}") + continue + + except Exception as e: + print(f"Warning: Could not extract education data: {e}") + + return education_list + + def _extract_works_count_selenium(self, soup: BeautifulSoup) -> Optional[int]: + """Extract works count from ORCID page using Selenium-rendered HTML""" + try: + # Look for works section with count + works_patterns = [ + r'Works.*\((\d+)\)', + r'(\d+)\s+works', + r'(\d+)\s+publications' + ] + + page_text = soup.get_text() + + for pattern in works_patterns: + match = re.search(pattern, page_text, re.IGNORECASE) + if match: + count = int(match.group(1)) + print(f"Found works count: {count}") + return count + + except Exception as e: + print(f"Warning: Could not extract works count: {e}") + return None + + def _extract_peer_reviews_count_selenium(self, soup: BeautifulSoup) -> Optional[int]: + """Extract peer reviews count from ORCID page using Selenium-rendered HTML""" + try: + # Look for peer review section with count + peer_review_patterns = [ + r'(\d+)\s+reviews?\s+for\s+(\d+)\s+publications', + r'Peer review.*\((\d+)\s+reviews?', + r'(\d+)\s+peer\s+reviews?' + ] + + page_text = soup.get_text() + + for pattern in peer_review_patterns: + match = re.search(pattern, page_text, re.IGNORECASE) + if match: + count = int(match.group(1)) + print(f"Found peer reviews count: {count}") + return count + + except Exception as e: + print(f"Warning: Could not extract peer reviews count: {e}") + return None + + def _extract_organization_name(self, text: str) -> Optional[str]: + """Extract organization name from text""" + # Split by common separators and take the first substantial part + parts = re.split(r'[,\n\t]', text) + for part in parts: + part = part.strip() + # Look for text that's not just dates or common words + if len(part) > 3 and not re.match(r'^\d{4}', part): + return part + return None + + def _extract_dates_from_text(self, text: str) -> tuple[Optional[str], Optional[str]]: + """Extract start and end dates from text""" + # Look for "YYYY to YYYY" pattern first (most specific) + to_pattern = r'\b(\d{4})\s+to\s+(\d{4})\b' + to_match = re.search(to_pattern, text) + if to_match: + return to_match.group(1), to_match.group(2) + + # Look for other date patterns as fallback + date_patterns = [ + r'\b(\d{1,2}[/-]\d{4})\b', # MM/YYYY or MM-YYYY + r'\b(\d{4})\b' # YYYY + ] + + dates = [] + for pattern in date_patterns: + matches = re.findall(pattern, text) + dates.extend(matches) + + # Remove duplicates while preserving order + unique_dates = [] + for date in dates: + if date not in unique_dates: + unique_dates.append(date) + + if len(unique_dates) >= 2: + return unique_dates[0], unique_dates[1] + elif len(unique_dates) == 1: + return unique_dates[0], None + + return None, None + + def _extract_role_from_text(self, text: str) -> Optional[str]: + """Extract role/title from text""" + # Common role indicators + role_keywords = ['professor', 'researcher', 'scientist', 'director', 'manager', 'analyst', 'engineer'] + + words = text.lower().split() + for i, word in enumerate(words): + if any(keyword in word for keyword in role_keywords): + # Return a few words around the keyword + start = max(0, i-1) + end = min(len(words), i+3) + return ' '.join(words[start:end]).title() + + return None + + def _extract_degree_from_text(self, text: str) -> Optional[str]: + """Extract degree from text""" + degree_patterns = [ + r'\b(Ph\.?D\.?|PhD|Doctor of Philosophy)\b', + r'\b(M\.?S\.?|MS|Master of Science)\b', + r'\b(M\.?A\.?|MA|Master of Arts)\b', + r'\b(B\.?S\.?|BS|Bachelor of Science)\b', + r'\b(B\.?A\.?|BA|Bachelor of Arts)\b', + r'\b(Bachelor|Master|Doctor)\s+[oO]f\s+\w+\b' + ] + + for pattern in degree_patterns: + match = re.search(pattern, text, re.IGNORECASE) + if match: + return match.group(0) + + return None + + def _extract_location_from_text(self, text: str) -> Optional[str]: + """Extract location from text""" + # Look for patterns like "City, Country" or "State, USA" + location_pattern = r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*),\s*([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\b' + match = re.search(location_pattern, text) + if match: + return f"{match.group(1)}, {match.group(2)}" + + return None + + def _extract_employment_from_orcid(self, soup: BeautifulSoup) -> List[ORCIDEmployment]: + """Extract employment information from ORCID page""" + employment_list = [] + + try: + # Look for employment section + employment_section = soup.find('section', {'id': 'affiliations'}) + if not employment_section: + return employment_list + + # Find employment panels + employment_panels = employment_section.find_all('app-affiliation-stack-group') + + for panel in employment_panels: + # Extract organization name + org_elements = panel.find_all(string=re.compile(r'\S+')) + organization = None + role = None + start_date = None + end_date = None + location = None + + # Try to extract structured data from text content + text_content = panel.get_text(strip=True) + + # Look for date patterns (YYYY or MM/YYYY) + date_matches = re.findall(r'\b(\d{4}|\d{1,2}/\d{4})\b', text_content) + + if len(date_matches) >= 2: + start_date = date_matches[0] + end_date = date_matches[1] if date_matches[1] != 'present' else None + elif len(date_matches) == 1: + start_date = date_matches[0] + + # Calculate duration if we have dates + duration_years = self._calculate_duration(start_date, end_date) + + # This is a simplified extraction - ORCID's dynamic content makes it complex + # You might need to use Selenium for better extraction + employment_list.append(ORCIDEmployment( + organization=organization or "Unknown Organization", + role=role, + start_date=start_date, + end_date=end_date, + location=location, + duration_years=duration_years + )) + + except Exception as e: + print(f"Warning: Could not extract employment data: {e}") + + return employment_list + + def _extract_education_from_orcid(self, soup: BeautifulSoup) -> List[ORCIDEducation]: + """Extract education information from ORCID page""" + education_list = [] + + try: + # Look for education section + education_section = soup.find('section', {'id': 'education-and-qualification'}) + if not education_section: + return education_list + + # Similar extraction logic as employment + # This is simplified - actual implementation would need more sophisticated parsing + + except Exception as e: + print(f"Warning: Could not extract education data: {e}") + + return education_list + + def _extract_works_count(self, soup: BeautifulSoup) -> Optional[int]: + """Extract works count from ORCID page""" + try: + # Look for works section with count + works_text = soup.find(string=re.compile(r'Works.*\((\d+)\)')) + if works_text: + match = re.search(r'\((\d+)\)', works_text) + if match: + return int(match.group(1)) + except Exception: + pass + return None + + def _extract_peer_reviews_count(self, soup: BeautifulSoup) -> Optional[int]: + """Extract peer reviews count from ORCID page""" + try: + # Look for peer review section with count + peer_review_text = soup.find(string=re.compile(r'Peer review.*\((\d+)\s+reviews')) + if peer_review_text: + match = re.search(r'\((\d+)\s+reviews', peer_review_text) + if match: + return int(match.group(1)) + except Exception: + pass + return None + + def _calculate_duration(self, start_date: Optional[str], end_date: Optional[str]) -> Optional[float]: + """Calculate duration in years between start and end dates""" + if not start_date: + return None + + try: + # Parse start year + start_year = int(start_date.split('/')[-1]) + + # If no end date, assume current year + if not end_date: + end_year = datetime.now().year + else: + end_year = int(end_date.split('/')[-1]) + + return float(end_year - start_year) + + except (ValueError, IndexError): + return None + + def _get_rest_user_data(self, username: str) -> Dict[str, Any]: + """Get basic user data from REST API""" + url = f"{self.rest_base_url}/users/{username}" + response = requests.get(url, headers=self.headers) + + if response.status_code == 404: + raise ValueError(f"User '{username}' not found") + + response.raise_for_status() + return response.json() + + def _get_graphql_user_data(self, username: str) -> Dict[str, Any]: + """Get extended user data from GraphQL API including social accounts""" + query = """ + query($username: String!) { + user(login: $username) { + socialAccounts(first: 10) { + nodes { + provider + url + displayName + } + } + ... on User { + bio + } + } + } + """ + + variables = {"username": username} + + payload = { + "query": query, + "variables": variables + } + + headers = self.headers.copy() + headers["Content-Type"] = "application/json" + + response = requests.post( + self.graphql_url, + headers=headers, + data=json.dumps(payload) + ) + + if response.status_code != 200: + # If GraphQL fails, return empty data + return {"social_accounts": []} + + data = response.json() + + if "errors" in data: + return {"social_accounts": []} + + user_data = data["data"]["user"] + if not user_data: + return {"social_accounts": []} + + # Extract social accounts + social_accounts = [] + if user_data.get("socialAccounts") and user_data["socialAccounts"].get("nodes"): + for account in user_data["socialAccounts"]["nodes"]: + social_accounts.append({ + "provider": account["provider"], + "url": account["url"], + "display_name": account.get("displayName", "") + }) + + return { + "social_accounts": social_accounts + } + + def _get_user_organizations(self, username: str) -> List[str]: + """Get user's public organizations""" + url = f"{self.rest_base_url}/users/{username}/orgs" + response = requests.get(url, headers=self.headers) + + if response.status_code != 200: + return [] + + orgs_data = response.json() + return [org["login"] for org in orgs_data] + + def _get_user_readme(self, username: str) -> Dict[str, Optional[str]]: + """Get user's README URL and content if it exists""" + # Try to get README from API first + url = f"{self.rest_base_url}/repos/{username}/{username}/readme" + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + readme_data = response.json() + content = self._decode_readme_content(readme_data.get("content", "")) + return { + "url": f"https://github.com/{username}/{username}/blob/main/README.md", + "content": content + } + + # Try master branch as fallback + url = f"{self.rest_base_url}/repos/{username}/{username}/contents/README.md" + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + readme_data = response.json() + content = self._decode_readme_content(readme_data.get("content", "")) + return { + "url": f"https://github.com/{username}/{username}/blob/master/README.md", + "content": content + } + + return {"url": None, "content": None} + + def _decode_readme_content(self, encoded_content: str) -> Optional[str]: + """Decode base64 encoded README content""" + if not encoded_content: + return None + + try: + # GitHub API returns content in base64 format + decoded_bytes = base64.b64decode(encoded_content) + return decoded_bytes.decode('utf-8') + except Exception as e: + print(f"Warning: Could not decode README content: {e}") + return None + + def _extract_activities_html(self, soup: BeautifulSoup) -> Optional[str]: + """Extract text content from ORCID Activities section""" + try: + # Find the Activities section by aria-label + activities_section = soup.find('section', {'aria-label': 'Activities'}) + + if not activities_section: + print("Warning: Activities section not found") + return None + + # Extract all text content from the Activities section + activities_text = activities_section.get_text(separator='\n', strip=True) + + # Clean up the text - remove excessive whitespace and empty lines + lines = [line.strip() for line in activities_text.split('\n') if line.strip()] + cleaned_text = '\n'.join(lines) + + print(f"Extracted {len(lines)} lines of activities text") + return cleaned_text + + except Exception as e: + print(f"Warning: Could not extract Activities text: {e}") + return None + + +def is_it_github_user(username: str) -> bool: + """ + Check if the given username is a valid GitHub user. + + Args: + username: GitHub username to check + + Returns: + True if user exists, False otherwise + """ + parser = GitHubUsersParser() + try: + parser.get_user_metadata(username) + return True + except ValueError: + return False + except requests.RequestException as e: + print(f"API Error: {e}") + return False + + +def parse_github_user(username: str) -> GitHubUserMetadata: + parser = GitHubUsersParser() + + try: + # Get user metadata + user_metadata = parser.get_user_metadata(username) + + # Export to JSON + #print("\nJSON representation:") + #print(json.dumps(user_metadata.dict(), indent=2)) + + return user_metadata + + except ValueError as e: + print(f"Error: {e}") + except requests.RequestException as e: + print(f"API Error: {e}") \ No newline at end of file diff --git a/src/files/json-ld-context.json b/src/files/json-ld-context.json index a48a4fd..f5e039d 100644 --- a/src/files/json-ld-context.json +++ b/src/files/json-ld-context.json @@ -76,7 +76,9 @@ "hasRorId": "md4i:hasRorId", "legalName": "schema:legalName", "fundingGrant": "sd:fundingGrant", - "fundingSource": "sd:fundingSource" + "fundingSource": "sd:fundingSource", + "discipline": "pulse:discipline", + "repositoryType": "pulse:repositoryType" } } diff --git a/src/test/test_web_scraper.py b/src/test/test_web_scraper.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/utils.py b/src/utils/utils.py index bf5763d..43a5e28 100644 --- a/src/utils/utils.py +++ b/src/utils/utils.py @@ -83,18 +83,82 @@ def merge_jsonld(gimie_graph: list, llm_jsonld: dict, output_path: str = None): from pydantic import HttpUrl, BaseModel from typing import Any -def convert_httpurl_to_str(obj: Any) -> Any: +# def convert_httpurl_to_str(obj: Any) -> Any: +# """ +# Recursively convert all HttpUrl fields in a Pydantic model (or nested structures) +# to plain strings, so the resulting dict is OpenAI-compatible. +# """ +# if isinstance(obj, HttpUrl): +# return str(obj) +# elif isinstance(obj, BaseModel): +# return {k: convert_httpurl_to_str(v) for k, v in obj.dict(exclude_none=True).items()} +# elif isinstance(obj, list): +# return [convert_httpurl_to_str(item) for item in obj] +# elif isinstance(obj, dict): +# return {k: convert_httpurl_to_str(v) for k, v in obj.items()} +# else: +# return obj + +import json +from pydantic import create_model, HttpUrl, BaseModel +from typing import get_origin, get_args, Union, List, Any, get_type_hints +import inspect + +def convert_httpurl_to_str(schema_class): """ - Recursively convert all HttpUrl fields in a Pydantic model (or nested structures) - to plain strings, so the resulting dict is OpenAI-compatible. + Convert HttpUrl fields to str fields for OpenAI compatibility, including nested models. """ - if isinstance(obj, HttpUrl): - return str(obj) - elif isinstance(obj, BaseModel): - return {k: convert_httpurl_to_str(v) for k, v in obj.dict(exclude_none=True).items()} - elif isinstance(obj, list): - return [convert_httpurl_to_str(item) for item in obj] - elif isinstance(obj, dict): - return {k: convert_httpurl_to_str(v) for k, v in obj.items()} + if not issubclass(schema_class, BaseModel): + return schema_class + + # Get the original fields + original_fields = schema_class.model_fields + new_fields = {} + + for field_name, field_info in original_fields.items(): + annotation = field_info.annotation + converted_annotation = _convert_annotation(annotation) + new_fields[field_name] = (converted_annotation, field_info.default) + + # Create new model class with converted fields + converted_model = create_model( + f"{schema_class.__name__}Converted", + **new_fields + ) + + return converted_model + +def _convert_annotation(annotation): + """ + Recursively convert annotations, replacing HttpUrl with str and handling nested models. + """ + origin = get_origin(annotation) + + # Handle Union types (Optional, etc.) + if origin is Union: + args = get_args(annotation) + new_args = tuple(_convert_annotation(arg) for arg in args) + return Union[new_args] + + # Handle List types + elif origin is list or origin is List: + args = get_args(annotation) + if args: + new_args = tuple(_convert_annotation(arg) for arg in args) + return List[new_args[0]] if len(new_args) == 1 else List[new_args] + return annotation + + # Handle HttpUrl -> str conversion + elif annotation is HttpUrl: + return str + + # Handle nested BaseModel classes + elif (inspect.isclass(annotation) and + issubclass(annotation, BaseModel) and + annotation is not BaseModel): + return convert_httpurl_to_str(annotation) + + # Return unchanged for all other types else: - return obj \ No newline at end of file + return annotation +