|
| 1 | +import base64 |
| 2 | +import os |
| 3 | +import re |
| 4 | +from typing import List |
| 5 | + |
| 6 | +KILOBYTE = 1024 |
| 7 | +MEGABYTE = 1024 * KILOBYTE |
| 8 | + |
| 9 | + |
| 10 | +def _process_input( |
| 11 | + input_url: str, supported_extensions: List[str], max_file_size: int |
| 12 | +) -> str: |
| 13 | + if re.match(r"^https?://", input_url): |
| 14 | + return input_url |
| 15 | + |
| 16 | + if os.path.exists(input_url): |
| 17 | + if os.path.getsize(input_url) > max_file_size: |
| 18 | + raise ValueError(f"File too large: max {max_file_size / MEGABYTE}MB") |
| 19 | + else: |
| 20 | + raise FileNotFoundError(f"File not found: {input_url}") |
| 21 | + |
| 22 | + validate_extension(input_url, supported_extensions) |
| 23 | + |
| 24 | + try: |
| 25 | + with open(input_url, "rb") as img_file: |
| 26 | + img_bytes = img_file.read() |
| 27 | + base64_data = base64.b64encode(img_bytes).decode("utf-8") |
| 28 | + |
| 29 | + return f"data:application/octet-stream;base64,{base64_data}" |
| 30 | + except Exception as e: |
| 31 | + raise ValueError(f"Error occurred while processing the file: {e}") |
| 32 | + |
| 33 | + |
| 34 | +def validate_extension(input_url: str, supported_extensions: List[str]) -> None: |
| 35 | + file_ext = input_url.lower().split(".")[-1] |
| 36 | + if file_ext not in supported_extensions: |
| 37 | + supported = ", ".join([f".{ext}" for ext in supported_extensions]) |
| 38 | + raise ValueError(f"Unsupported image extension. supported: {supported}") |
| 39 | + |
| 40 | + |
| 41 | +def create_message( |
| 42 | + input_url: str, supported_extensions: List[str], max_file_size: int |
| 43 | +) -> dict: |
| 44 | + url = _process_input(input_url, supported_extensions, max_file_size) |
| 45 | + |
| 46 | + return {"type": "image_url", "image_url": {"url": url}} |
0 commit comments