diff --git a/config.py b/config.py index 7666d0ac..f384402c 100644 --- a/config.py +++ b/config.py @@ -1,5 +1,6 @@ import os + #################################### # Load .env file #################################### @@ -12,4 +13,12 @@ print("dotenv not installed, skipping...") API_KEY = os.getenv("PIPELINES_API_KEY", "0p3n-w3bu!") + +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") + PIPELINES_DIR = os.getenv("PIPELINES_DIR", "./pipelines") +RESET_PIPELINES_DIR = os.getenv("RESET_PIPELINES_DIR", "false").lower() == "true" +PIPELINES_REQUIREMENTS_PATH = os.getenv("PIPELINES_REQUIREMENTS_PATH") +PIPELINES_URLS = os.getenv("PIPELINES_URLS") + +SUPPRESS_PIP_OUTPUT = os.getenv("SUPPRESS_PIP_OUTPUT", "true").lower() == "true" diff --git a/main.py b/main.py index 33c04997..8eb61b65 100644 --- a/main.py +++ b/main.py @@ -2,37 +2,43 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.concurrency import run_in_threadpool - -from starlette.responses import StreamingResponse, Response -from pydantic import BaseModel, ConfigDict -from typing import List, Union, Generator, Iterator - - -from utils.pipelines.auth import bearer_security, get_current_user -from utils.pipelines.main import get_last_user_message, stream_message_template -from utils.pipelines.misc import convert_to_raw_url +from pydantic import BaseModel +from starlette.responses import StreamingResponse +from typing import Generator, Iterator from contextlib import asynccontextmanager -from concurrent.futures import ThreadPoolExecutor -from schemas import FilterForm, OpenAIChatCompletionForm -from urllib.parse import urlparse import shutil -import aiohttp import os import importlib.util -import logging import time import json import uuid -import sys -import subprocess +from schemas import FilterForm, OpenAIChatCompletionForm -from config import API_KEY, PIPELINES_DIR +from utils.pipelines.logger import setup_logger -if not os.path.exists(PIPELINES_DIR): - os.makedirs(PIPELINES_DIR) +logger = setup_logger(__name__) + +from utils.pipelines.auth import bearer_security, get_current_user +from utils.pipelines.downloads import ( + download_file_to_folder, + download_pipelines, + install_requirements, + install_requirements_from_file, + reset_pipelines_dir, +) +from utils.pipelines.main import get_last_user_message, stream_message_template +from utils.pipelines.misc import convert_to_raw_url + +from config import ( + API_KEY, + PIPELINES_DIR, + PIPELINES_REQUIREMENTS_PATH, + PIPELINES_URLS, + RESET_PIPELINES_DIR, +) PIPELINES = {} @@ -106,62 +112,28 @@ def get_all_pipelines(): return pipelines -def parse_frontmatter(content): - frontmatter = {} - for line in content.split('\n'): - if ':' in line: - key, value = line.split(':', 1) - frontmatter[key.strip().lower()] = value.strip() - return frontmatter - -def install_frontmatter_requirements(requirements): - if requirements: - req_list = [req.strip() for req in requirements.split(',')] - for req in req_list: - print(f"Installing requirement: {req}") - subprocess.check_call([sys.executable, "-m", "pip", "install", req]) - else: - print("No requirements found in frontmatter.") async def load_module_from_path(module_name, module_path): - try: - # Read the module content - with open(module_path, 'r') as file: - content = file.read() - - # Parse frontmatter - frontmatter = {} - if content.startswith('"""'): - end = content.find('"""', 3) - if end != -1: - frontmatter_content = content[3:end] - frontmatter = parse_frontmatter(frontmatter_content) - - # Install requirements if specified - if 'requirements' in frontmatter: - install_frontmatter_requirements(frontmatter['requirements']) - - # Load the module + await install_requirements_from_file(module_path) spec = importlib.util.spec_from_file_location(module_name, module_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) - print(f"Loaded module: {module.__name__}") + logger.info(f"Loaded module: {module.__name__}") if hasattr(module, "Pipeline"): return module.Pipeline() else: raise Exception("No Pipeline class found") except Exception as e: - print(f"Error loading module: {module_name}") + logger.error(f"Error loading module: {module_name}") - # Move the file to the error folder failed_pipelines_folder = os.path.join(PIPELINES_DIR, "failed") if not os.path.exists(failed_pipelines_folder): os.makedirs(failed_pipelines_folder) failed_file_path = os.path.join(failed_pipelines_folder, f"{module_name}.py") os.rename(module_path, failed_file_path) - print(e) + logger.error(str(e)) return None @@ -172,20 +144,25 @@ async def load_modules_from_directory(directory): for filename in os.listdir(directory): if filename.endswith(".py"): module_name = filename[:-3] # Remove the .py extension + + # Skip __init__.py files for pipeline loading + if module_name == "__init__": + continue + module_path = os.path.join(directory, filename) # Create subfolder matching the filename without the .py extension subfolder_path = os.path.join(directory, module_name) if not os.path.exists(subfolder_path): os.makedirs(subfolder_path) - logging.info(f"Created subfolder: {subfolder_path}") + logger.debug(f"Created subfolder: {subfolder_path}") # Create a valves.json file if it doesn't exist valves_json_path = os.path.join(subfolder_path, "valves.json") if not os.path.exists(valves_json_path): with open(valves_json_path, "w") as f: json.dump({}, f) - logging.info(f"Created valves.json in: {subfolder_path}") + logger.debug(f"Created valves.json in: {subfolder_path}") pipeline = await load_module_from_path(module_name, module_path) if pipeline: @@ -203,20 +180,26 @@ async def load_modules_from_directory(directory): valves = ValvesModel(**combined_valves) pipeline.valves = valves - logging.info(f"Updated valves for module: {module_name}") + logger.debug(f"Updated valves for module: {module_name}") pipeline_id = pipeline.id if hasattr(pipeline, "id") else module_name PIPELINE_MODULES[pipeline_id] = pipeline PIPELINE_NAMES[pipeline_id] = module_name - logging.info(f"Loaded module: {module_name}") + logger.info(f"Loaded module: {module_name}") else: - logging.warning(f"No Pipeline class found in {module_name}") + logger.warning(f"No Pipeline class found in {module_name}") global PIPELINES PIPELINES = get_all_pipelines() async def on_startup(): + if not os.path.exists(PIPELINES_DIR): + os.makedirs(PIPELINES_DIR) + + await reset_pipelines_dir(PIPELINES_DIR, RESET_PIPELINES_DIR) + await install_requirements(PIPELINES_REQUIREMENTS_PATH) + await download_pipelines(PIPELINES_URLS, PIPELINES_DIR) await load_modules_from_directory(PIPELINES_DIR) for module in PIPELINE_MODULES.values(): @@ -277,7 +260,7 @@ async def check_url(request: Request, call_next): @app.get("/v1/models") @app.get("/models") -async def get_models(): +async def get_models(user: str = Depends(get_current_user)): """ Returns the available pipelines """ @@ -354,29 +337,6 @@ class AddPipelineForm(BaseModel): url: str -async def download_file(url: str, dest_folder: str): - filename = os.path.basename(urlparse(url).path) - if not filename.endswith(".py"): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="URL must point to a Python file", - ) - - file_path = os.path.join(dest_folder, filename) - - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status != 200: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Failed to download file", - ) - with open(file_path, "wb") as f: - f.write(await response.read()) - - return file_path - - @app.post("/v1/pipelines/add") @app.post("/pipelines/add") async def add_pipeline( @@ -391,8 +351,8 @@ async def add_pipeline( try: url = convert_to_raw_url(form_data.url) - print(url) - file_path = await download_file(url, dest_folder=PIPELINES_DIR) + logger.debug(f"Downloading pipeline from {url}") + file_path = await download_file_to_folder(url, dest_folder=PIPELINES_DIR) await reload() return { "status": True, @@ -576,7 +536,7 @@ async def update_valves(pipeline_id: str, form_data: dict): if hasattr(pipeline, "on_valves_updated"): await pipeline.on_valves_updated() except Exception as e: - print(e) + logger.error(e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"{str(e)}", @@ -610,7 +570,7 @@ async def filter_inlet(pipeline_id: str, form_data: FilterForm): else: return form_data.body except Exception as e: - print(e) + logger.error(e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"{str(e)}", @@ -642,7 +602,7 @@ async def filter_outlet(pipeline_id: str, form_data: FilterForm): else: return form_data.body except Exception as e: - print(e) + logger.error(e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"{str(e)}", @@ -665,13 +625,11 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): ) def job(): - print(form_data.model) + logger.error(form_data.model) pipeline = app.state.PIPELINES[form_data.model] pipeline_id = form_data.model - print(pipeline_id) - if pipeline["type"] == "manifold": manifold_id, pipeline_id = pipeline_id.split(".", 1) pipe = PIPELINE_MODULES[manifold_id].pipe @@ -688,11 +646,11 @@ def stream_content(): body=form_data.model_dump(), ) - logging.info(f"stream:true:{res}") + logger.info(f"stream:true:{res}") if isinstance(res, str): message = stream_message_template(form_data.model, res) - logging.info(f"stream_content:str:{message}") + logger.info(f"stream_content:str:{message}") yield f"data: {json.dumps(message)}\n\n" if isinstance(res, Iterator): @@ -706,7 +664,7 @@ def stream_content(): except: pass - logging.info(f"stream_content:Generator:{line}") + logger.info(f"stream_content:Generator:{line}") if line.startswith("data:"): yield f"{line}\n\n" @@ -741,7 +699,7 @@ def stream_content(): messages=messages, body=form_data.model_dump(), ) - logging.info(f"stream:false:{res}") + logger.info(f"stream:false:{res}") if isinstance(res, dict): return res @@ -758,7 +716,7 @@ def stream_content(): for stream in res: message = f"{message}{stream}" - logging.info(f"stream:false:{message}") + logger.info(f"stream:false:{message}") return { "id": f"{form_data.model}-{str(uuid.uuid4())}", "object": "chat.completion", diff --git a/requirements-minimum.txt b/requirements-minimum.txt index 559be6aa..bb43e3cd 100644 --- a/requirements-minimum.txt +++ b/requirements-minimum.txt @@ -11,5 +11,5 @@ PyJWT[crypto] requests==2.32.2 aiohttp==3.9.5 -httpx - +gitpython==3.1.43 +httpx \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 86160d76..5e27d1a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ PyJWT[crypto] requests==2.32.2 aiohttp==3.9.5 +gitpython==3.1.43 httpx # AI libraries diff --git a/start.sh b/start.sh index e2ddd8b4..825339da 100755 --- a/start.sh +++ b/start.sh @@ -1,133 +1,6 @@ #!/usr/bin/env bash PORT="${PORT:-9099}" HOST="${HOST:-0.0.0.0}" -# Default value for PIPELINES_DIR -PIPELINES_DIR=${PIPELINES_DIR:-./pipelines} - -# Function to reset pipelines -reset_pipelines_dir() { - if [ "$RESET_PIPELINES_DIR" = true ]; then - echo "Resetting pipelines directory: $PIPELINES_DIR" - - # Check if the directory exists - if [ -d "$PIPELINES_DIR" ]; then - # Remove all contents of the directory - rm -rf "${PIPELINES_DIR:?}"/* - echo "All contents in $PIPELINES_DIR have been removed." - - # Optionally recreate the directory if needed - mkdir -p "$PIPELINES_DIR" - echo "$PIPELINES_DIR has been recreated." - else - echo "Directory $PIPELINES_DIR does not exist. No action taken." - fi - else - echo "RESET_PIPELINES_DIR is not set to true. No action taken." - fi -} - -# Example usage of the function -reset_pipelines_dir - -# Function to install requirements if requirements.txt is provided -install_requirements() { - if [[ -f "$1" ]]; then - echo "requirements.txt found at $1. Installing requirements..." - pip install -r "$1" - else - echo "requirements.txt not found at $1. Skipping installation of requirements." - fi -} - -# Check if the PIPELINES_REQUIREMENTS_PATH environment variable is set and non-empty -if [[ -n "$PIPELINES_REQUIREMENTS_PATH" ]]; then - # Install requirements from the specified requirements.txt - install_requirements "$PIPELINES_REQUIREMENTS_PATH" -else - echo "PIPELINES_REQUIREMENTS_PATH not specified. Skipping installation of requirements." -fi - - -# Function to download the pipeline files -download_pipelines() { - local path=$1 - local destination=$2 - - # Remove any surrounding quotes from the path - path=$(echo "$path" | sed 's/^"//;s/"$//') - - echo "Downloading pipeline files from $path to $destination..." - - if [[ "$path" =~ ^https://github.com/.*/.*/blob/.* ]]; then - # It's a single file - dest_file=$(basename "$path") - curl -L "$path?raw=true" -o "$destination/$dest_file" - elif [[ "$path" =~ ^https://github.com/.*/.*/tree/.* ]]; then - # It's a folder - git_repo=$(echo "$path" | awk -F '/tree/' '{print $1}') - subdir=$(echo "$path" | awk -F '/tree/' '{print $2}') - git clone --depth 1 --filter=blob:none --sparse "$git_repo" "$destination" - ( - cd "$destination" || exit - git sparse-checkout set "$subdir" - ) - elif [[ "$path" =~ \.py$ ]]; then - # It's a single .py file (but not from GitHub) - dest_file=$(basename "$path") - curl -L "$path" -o "$destination/$dest_file" - else - echo "Invalid URL format: $path" - exit 1 - fi -} - -# Function to parse and install requirements from frontmatter -install_frontmatter_requirements() { - local file=$1 - local file_content=$(cat "$1") - # Extract the first triple-quoted block - local first_block=$(echo "$file_content" | awk '/"""/{flag=!flag; if(flag) count++; if(count == 2) {exit}} flag' ) - - # Check if the block contains requirements - local requirements=$(echo "$first_block" | grep -i 'requirements:') - - if [ -n "$requirements" ]; then - # Extract the requirements list - requirements=$(echo "$requirements" | awk -F': ' '{print $2}' | tr ',' ' ' | tr -d '\r') - - # Construct and echo the pip install command - local pip_command="pip install $requirements" - echo "$pip_command" - pip install $requirements - else - echo "No requirements found in frontmatter of $file." - fi -} - - - -# Check if PIPELINES_URLS environment variable is set and non-empty -if [[ -n "$PIPELINES_URLS" ]]; then - if [ ! -d "$PIPELINES_DIR" ]; then - mkdir -p "$PIPELINES_DIR" - fi - - # Split PIPELINES_URLS by ';' and iterate over each path - IFS=';' read -ra ADDR <<< "$PIPELINES_URLS" - for path in "${ADDR[@]}"; do - download_pipelines "$path" "$PIPELINES_DIR" - done - - for file in "$pipelines_dir"/*; do - if [[ -f "$file" ]]; then - install_frontmatter_requirements "$file" - fi - done -else - echo "PIPELINES_URLS not specified. Skipping pipelines download and installation." -fi - - # Start the server uvicorn main:app --host "$HOST" --port "$PORT" --forwarded-allow-ips '*' diff --git a/utils/pipelines/auth.py b/utils/pipelines/auth.py index df03ad87..348e6612 100644 --- a/utils/pipelines/auth.py +++ b/utils/pipelines/auth.py @@ -1,18 +1,14 @@ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials -from fastapi import HTTPException, status, Depends +from fastapi import Depends, HTTPException, status -from pydantic import BaseModel from typing import Union, Optional - from passlib.context import CryptContext from datetime import datetime, timedelta import jwt -import logging import os -import requests -import uuid +from config import API_KEY SESSION_SECRET = os.getenv("SESSION_SECRET", " ") ALGORITHM = "HS256" @@ -62,4 +58,11 @@ def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(bearer_security), ) -> Optional[dict]: token = credentials.credentials + + if token != API_KEY: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid API key", + ) + return token diff --git a/utils/pipelines/downloads.py b/utils/pipelines/downloads.py new file mode 100644 index 00000000..c73360b5 --- /dev/null +++ b/utils/pipelines/downloads.py @@ -0,0 +1,308 @@ +import aiohttp +import git +import os +import re +import sys +import subprocess +import shutil +import tempfile +import time +import zipfile + +from fastapi import status, HTTPException +from urllib.parse import urlparse + + +from config import SUPPRESS_PIP_OUTPUT +from .logger import setup_logger + +logger = setup_logger(__name__) + + +def parse_frontmatter(content): + frontmatter = {} + for line in content.split("\n"): + if ":" in line: + key, value = line.split(":", 1) + frontmatter[key.strip().lower()] = value.strip() + return frontmatter + + +async def reset_pipelines_dir(PIPELINES_DIR, RESET_PIPELINES_DIR): + if RESET_PIPELINES_DIR: + logger.info(f"Resetting pipelines directory: {PIPELINES_DIR}") + if os.path.isdir(PIPELINES_DIR): + max_retries = 3 + retry_delay = 1 # seconds + + for attempt in range(max_retries): + try: + # First try to remove all contents inside the directory + for root, dirs, files in os.walk(PIPELINES_DIR, topdown=False): + for name in files: + file_path = os.path.join(root, name) + try: + os.remove(file_path) + except OSError: + continue + for name in dirs: + dir_path = os.path.join(root, name) + try: + os.rmdir(dir_path) + except OSError: + continue + + # Then try to remove the main directory + try: + os.rmdir(PIPELINES_DIR) + except OSError: + pass + + logger.debug(f"All contents in {PIPELINES_DIR} have been removed.") + break + except Exception as e: + if attempt < max_retries - 1: + logger.warning( + f"Attempt {attempt + 1} failed, retrying in {retry_delay} seconds..." + ) + time.sleep(retry_delay) + retry_delay *= 2 # exponential backoff + else: + logger.warning( + f"Warning: Could not fully remove directory: {e}" + ) + + os.makedirs(PIPELINES_DIR, exist_ok=True) + logger.info(f"{PIPELINES_DIR} has been recreated.") + else: + logger.debug("RESET_PIPELINES_DIR is not set to true. No action taken.") + + +def install_pip_packages(packages): + """Install packages using pip. + Args: + packages: A list of packages or pip arguments. + """ + if not packages: + return + logger.info(f"Installing packages: {packages}") + cmd = [sys.executable, "-m", "pip", "install"] + packages + subprocess.check_call( + cmd, + stdout=subprocess.DEVNULL if SUPPRESS_PIP_OUTPUT else None, + stderr=subprocess.DEVNULL if SUPPRESS_PIP_OUTPUT else None, + ) + + +async def install_requirements(PIPELINES_REQUIREMENTS_PATH): + if PIPELINES_REQUIREMENTS_PATH and os.path.isfile(PIPELINES_REQUIREMENTS_PATH): + logger.info(f"Installing requirements from {PIPELINES_REQUIREMENTS_PATH}") + install_pip_packages(["-r", PIPELINES_REQUIREMENTS_PATH]) + else: + logger.debug( + "PIPELINES_REQUIREMENTS_PATH not specified or file not found. Skipping installation of requirements." + ) + + +async def download_pipelines(PIPELINES_URLS, PIPELINES_DIR): + if not PIPELINES_URLS or not PIPELINES_URLS.strip(): + logger.debug( + "PIPELINES_URLS not specified. Skipping pipelines download and installation." + ) + return + + if not os.path.exists(PIPELINES_DIR): + os.makedirs(PIPELINES_DIR) + + # Split and clean URLs, filtering out empty strings + pipeline_urls = [ + url.strip().strip('"') + for url in PIPELINES_URLS.split(";") + if url.strip().strip('"') + ] + + if not pipeline_urls: + logger.warning("No valid URLs found in PIPELINES_URLS. Skipping download.") + return + + for url in pipeline_urls: + await download_pipeline(url, PIPELINES_DIR) + + +async def download_pipeline(url, destination): + url = url.strip('"') + logger.info(f"Downloading pipeline files from {url} to {destination}...") + + if "github.com" in url: + if re.match(r"^https://github\.com/.+/.+/blob/.+", url): + logger.debug("Found single file from GitHub...") + # It's a single file from GitHub + raw_url = url.replace("/blob/", "/raw/") + filename = os.path.basename(url) + filepath = os.path.join(destination, filename) + logger.debug(f"Downloading {raw_url} to {filepath}") + await download_file(raw_url, filepath) + await install_frontmatter_requirements(filepath) + elif re.match(r"^https://github\.com/.+/.+/tree/.+", url): + # It's a folder from GitHub + logger.debug("Found folder from GitHub...") + await download_github_folder(url, destination) + elif re.match(r"^https://github\.com/.+/.+/archive/.+\.zip$", url): + # Download and extract the zip archive + logger.debug("Found zip archive from GitHub...") + await download_and_extract_zip(url, destination) + elif re.match(r"^https://github\.com/.+/.+$", url): + # General GitHub repository URL + logger.debug("Found GitHub repository...") + await clone_github_repo(url, destination) + else: + logger.error(f"Invalid GitHub URL format: {url}") + elif url.endswith(".py"): + # Download single .py file (not from GitHub) + filename = os.path.basename(url) + filepath = os.path.join(destination, filename) + await download_file(url, filepath) + await install_frontmatter_requirements(filepath) + else: + logger.error(f"Invalid URL format: {url}") + + +async def download_file(url, filepath): + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status != 200: + logger.error(f"Failed to download file: {url}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Failed to download file", + ) + + data = await response.read() + with open(filepath, "wb") as f: + f.write(data) + logger.debug(f"Downloaded {filepath}") + + +async def download_file_to_folder(url: str, dest_folder: str): + filename = os.path.basename(urlparse(url).path) + if not filename.endswith(".py"): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="URL must point to a Python file", + ) + + file_path = os.path.join(dest_folder, filename) + await download_file(url, file_path) + return file_path + + +async def download_and_extract_zip(url, destination): + zip_path = os.path.join(destination, "archive.zip") + await download_file(url, zip_path) + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(destination) + os.remove(zip_path) + logger.debug(f"Extracted archive to {destination}") + + +async def clone_github_repo(url, destination): + try: + logger.debug(f"Cloning repository {url} to {destination}...") + git.Repo.clone_from(url, destination) + logger.debug(f"Cloned repository {url} to {destination}") + except Exception as e: + logger.error(f"Failed to clone repository: {e}") + + +async def download_github_folder(url, destination): + logger.debug(f"Downloading folder from GitHub: {url}") + match = re.match( + r"^https://github\.com/(?P.+)/(?P.+)/tree/(?P[^/]+)/(?P.+)", + url, + ) + if not match: + logger.warning(f"Invalid GitHub tree URL: {url}") + return + owner = match.group("owner") + repo_name = match.group("repo") + branch = match.group("branch") + subdir = match.group("subdir") + git_url = f"https://github.com/{owner}/{repo_name}.git" + temp_dir = tempfile.mkdtemp() + try: + repo = git.Repo.clone_from( + git_url, + temp_dir, + no_checkout=True, + depth=1, + ) + # Enable sparse checkout + repo.git.config("core.sparseCheckout", "true") + sparse_checkout_path = os.path.join(temp_dir, ".git", "info", "sparse-checkout") + with open(sparse_checkout_path, "w") as f: + f.write(f"{subdir}/*\n") + # Checkout the branch + repo.git.checkout(branch) + src_path = os.path.join(temp_dir, subdir) + if os.path.exists(src_path): + for item in os.listdir(src_path): + s = os.path.join(src_path, item) + d = os.path.join(destination, item) + if os.path.isdir(s): + shutil.copytree(s, d, dirs_exist_ok=True) + else: + shutil.copy2(s, d) + logger.debug(f"Downloaded folder {subdir} from {url}") + else: + logger.error(f"Subdirectory {subdir} not found in repository.") + except Exception as e: + logger.error(f"Failed to download folder: {e}") + finally: + shutil.rmtree(temp_dir) + + +async def install_frontmatter_requirements(file_path): + with open(file_path, "r") as f: + content = f.read() + # Extract the first triple-quoted block + if content.startswith('"""'): + end = content.find('"""', 3) + if end != -1: + frontmatter_content = content[3:end] + frontmatter = parse_frontmatter(frontmatter_content) + requirements = frontmatter.get("requirements") + if requirements: + req_list = [req.strip() for req in requirements.split(",")] + if req_list: + install_pip_packages(req_list) + else: + logger.info( + f"No valid requirements found in frontmatter of {file_path}." + ) + else: + logger.info(f"No requirements found in frontmatter of {file_path}.") + else: + logger.info(f"No frontmatter block closure in {file_path}.") + else: + logger.info(f"No frontmatter found in {file_path}.") + + +# Add a function to handle requirements installation from a file +async def install_requirements_from_file(module_path): # Changed to async + with open(module_path, "r") as f: + content = f.read() + if content.startswith('"""'): + end = content.find('"""', 3) + if end != -1: + frontmatter_content = content[3:end] + frontmatter = parse_frontmatter(frontmatter_content) + requirements = frontmatter.get("requirements") + if requirements: + req_list = [req.strip() for req in requirements.split(",")] + install_pip_packages(req_list) + else: + logger.info(f"No requirements found in frontmatter of {module_path}.") + else: + logger.info(f"No closing triple quotes for frontmatter in {module_path}.") + else: + logger.info(f"No frontmatter found in {module_path}.") diff --git a/utils/pipelines/logger.py b/utils/pipelines/logger.py new file mode 100644 index 00000000..4c7e6c47 --- /dev/null +++ b/utils/pipelines/logger.py @@ -0,0 +1,38 @@ +import logging + +from config import LOG_LEVEL + + +def setup_logger(name): + """ + Create a logger instance with simplified formatting + + Args: + name: The name of the logger (typically __name__) + + Returns: + logging.Logger: Configured logger instance + """ + logger = logging.getLogger(name) + + # Convert string level to numeric level + numeric_level = logging.getLevelName(LOG_LEVEL.upper()) + logger.setLevel(numeric_level) + + # Only add handler if the logger doesn't already have handlers + if not logger.handlers: + handler = logging.StreamHandler() + + # Choose formatter based on log level - show name except for INFO + if LOG_LEVEL.upper() != "INFO": + formatter = logging.Formatter("%(levelname)s (%(name)s): %(message)s") + else: + formatter = logging.Formatter("%(levelname)s: %(message)s") + + handler.setFormatter(formatter) + logger.addHandler(handler) + + # Prevent log propagation to avoid duplicate logs + logger.propagate = False + + return logger