diff --git a/examples/cloud-edge-collaborative-inference-for-llm/Dockerfile b/examples/cloud-edge-collaborative-inference-for-llm/Dockerfile new file mode 100644 index 00000000..6e29ee95 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/Dockerfile @@ -0,0 +1,45 @@ +# Use Miniconda base image +FROM continuumio/miniconda3:latest + +ENV CONDA_ENV=ianvs-experiment \ + PYTHON_VERSION=3.8 + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + curl \ + gnupg \ + git \ + unzip + +# Copy kaggle.json (Make sure this file is in the same directory as your Dockerfile) +COPY kaggle.json /root/.kaggle/kaggle.json +RUN chmod 600 /root/.kaggle/kaggle.json + +# Clone Ianvs repo +RUN git clone https://github.com/kubeedge/ianvs.git +WORKDIR /ianvs + +# Create conda environment with Python and Rust +RUN conda create -y -n $CONDA_ENV python=$PYTHON_VERSION rust -c conda-forge + +# Install dependencies inside the conda environment and Ianvs +RUN /bin/bash -c "source activate $CONDA_ENV && \ + pip install examples/resources/third_party/sedna-0.6.0.1-py3-none-any.whl && \ + pip install -r requirements.txt && \ + pip install -r examples/cloud-edge-collaborative-inference-for-llm/requirements.txt && \ + python setup.py install" + +# Download Kaggle CLI +RUN pip install kaggle + +# Download dataset +RUN cd /ianvs && \ + kaggle datasets download -d kubeedgeianvs/ianvs-mmlu-5shot && \ + kaggle datasets download -d kubeedgeianvs/ianvs-gpqa-diamond && \ + unzip -o ianvs-mmlu-5shot.zip && \ + unzip -o ianvs-gpqa-diamond.zip && \ + rm -rf ianvs-mmlu-5shot.zip && \ + rm -rf ianvs-gpqa-diamond.zip + +# Set final working directory +WORKDIR /ianvs diff --git a/examples/cloud-edge-collaborative-inference-for-llm/README.md b/examples/cloud-edge-collaborative-inference-for-llm/README.md index 304bdf1b..6b8723ce 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/README.md +++ b/examples/cloud-edge-collaborative-inference-for-llm/README.md @@ -1,6 +1,34 @@ -# Quick Start - -## Introduction +# Table of Contents + +- [Details of Cloud Edge Collaborative Inference for LLM](#deatails-of-cloud-edge-collaborative-inference-for-llms) + - [Introduction](#introduction) + - [Why LLM Need Cloud-Edge Collaborative Inference?](#why-llm-need-cloud-edge-collaborative-inference) + - [Possible Collaborative Inference Strategy](#possible-collaborative-inference-strategy) + - [Details of Design](#details-of-design) +- [Quick Start Guide](#quick-start-guide) + - [Required Resources](#required-resources) + - [Methods for benchmarking with ianvs](#methods-for-benchmarking-with-ianvs) + - [Docker-Based Setup](#docker-based-setup) + - [Detailed Setup Guide](#detailed-setup-guide) + - [Step 1: Ianvs Preparation](#step-1-ianvs-preparation) + - [Step 2: Dataset and Model Preparation](#step-2-dataset-and-model-preparation) + - [Dataset Configuration](#dataset-configuration) + - [Metric Configuration](#metric-configuration) + - [Model Configuration](#model-configuration) + - [Router Configuration](#router-configuration) + - [Data Processor Configuration](#data-processor-configuration) + - [Step 3: Run Ianvs](#step-3-run-ianvs) + - [Provided Response Cache](#provided-response-cache) + - [Run Joint Inference Example](#run-joint-inference-example) + - [Results](#results) +- [Discussion](#discussion) + - [Query Routing's Application Scenario](#query-routings-application-scenario) +- [Future](#future) +- [References](#references) + +## Deatails of Cloud Edge Collaborative Inference for LLMs + +### Introduction This example aims to implement benchmarks for **LLM in cloud-edge collaborative inference scenario**. @@ -42,11 +70,13 @@ To save API calls during multi-round testing, this example has designed a result After all tests are completed, the Test Env Manager will calculate relevant metrics based on selected Metrics and hand over to Story Manager for printing test reports and generating Leader Board. -## Required Resources +## Quick Start Guide + +### Required Resources Before using this example, you need to have the device ready: -One machine is all you need, i.e., a laptop or a virtual machine is sufficient and a cluster is not necessary +- One machine is all you need, i.e., a laptop or a virtual machine is sufficient and a cluster is not necessary - 2 CPUs or more @@ -60,52 +90,117 @@ One machine is all you need, i.e., a laptop or a virtual machine is sufficient a - Python 3.8+ environment -## Step 1. Ianvs Preparation +### Methods for Benchmarking with Ianvs + +- To quickly experience benchmarking with Ianvs, proceed with the [Docker-Based Setup](#docker-based-setup). +- For a detailed setup process, including creating a custom dataset, refer to the [Detailed Setup Guide](#detailed-setup-guide). + +### Docker based setup + +The Docker-based setup assumes you have Docker installed on your system and are using an Ubuntu-based Linux distribution. + +**Note**: +- If you don't have Docker installed, follow the Docker Engine installation guide [here](https://docs.docker.com/engine/install/ubuntu/). +- To enable Docker to download datasets from Kaggle within your docker container, you need to configure the Kaggle CLI authentication token. Please follow the [official Kaggle API documentation](https://www.kaggle.com/docs/api#:~:text=is%20%24PYTHON_HOME/Scripts.-,Authentication,-In%20order%20to) to download your `kaggle.json` token. Once downloaded, move the file to the `~/ianvs/examples/cloud-edge-collaborative-inference-for-llm/` directory after doing step 1(cloning the ianvs repo): ```bash -# Create a new conda environment with Python>=3.8 (venv users can do it in their own way). -conda create -n ianvs-experiment python=3.8 +mv /path/to/kaggle.json ~/ianvs/examples/cloud-edge-collaborative-inference-for-llm/ +``` -# Activate our environment +1. Clone Ianvs Repo +``` +git clone https://github.com/kubeedge/ianvs.git +cd ianvs +``` + +2. From the root directory of Ianvs, build the `cloud-edge-collaborative-inference-for-llm` Docker image: + +**Note**: If you have already build the image, then move on to the second step directly. + +```bash +docker build -t ianvs-experiment-image ./examples/cloud-edge-collaborative-inference-for-llm/ +``` + +3. Run the image in an interactive shell: +```bash +docker run -it ianvs-experiment-image /bin/bash +``` + +4. Activate the ianvs-experiment Conda environment: +```bash conda activate ianvs-experiment +``` + +5. Set the required environment variables for the API (use either OpenAI or GROQ credentials): +```bash +export OPENAI_BASE_URL="https://api.openai.com/v1" +export OPENAI_API_KEY=sk_xxxxxxxx +``` + +`Alternatively, for GROQ, use GROQ_BASE_URL and GROQ_API_KEY.` + +6. Run the Ianvs benchmark: +```bash +ianvs -f examples/cloud-edge-collaborative-inference-for-llm/benchmarkingjob.yaml +``` + +*Note: To help you get results quickly, we have provided a workspace folder with cached results for `Qwen/Qwen2.5-1.5B-Instruct`, `Qwen/Qwen2.5-3B-Instruct`,`Qwen/Qwen2.5-7B-Instruct` and `gpt-4o-mini`.* + +- If you want to create a custom dataset, proceed to the next section. + +### Detailed Setup Guide + +#### Step 1. Ianvs Preparation + +```bash # Clone Ianvs Repo git clone https://github.com/kubeedge/ianvs.git cd ianvs +# Create a new conda environment with Python>=3.8 and rust(venv users can do it in their own way). +conda create -n ianvs-experiment python=3.8 rust -c conda-forge + +# Activate our environment +conda activate ianvs-experiment + # Install Sedna pip install examples/resources/third_party/sedna-0.6.0.1-py3-none-any.whl -# Install dependencies for this example. -pip install -r examples/cloud-edge-collaborative-inference-for-llm/requirements.txt - # Install dependencies for Ianvs Core. pip install -r requirements.txt +# Install dependencies for this example. +pip install -r examples/cloud-edge-collaborative-inference-for-llm/requirements.txt + # Install ianvs python setup.py install ``` If you want to use speculative decoding models like [EAGLE](https://github.com/SafeAILab/EAGLE), refer to the original repository for setup instructions. -## Step 2. Dataset and Model Preparation - -### Dataset Configuration +#### Step 2. Dataset and Model Preparation -Here, we provide `MMLU-5-shot` dataset and `GPQA-diamond` dataset for testing. The following is the instruction for dataset preparation for `MMLU-5-shot`, `GPQA-diamond` follows the same progress. +##### Dataset Configuration -1. Download `mmlu-5-shot` from [Ianvs-MMLU-5-shot](https://huggingface.co/datasets/FuryMartin/Ianvs-MMLU-5-shot), (or [Ianvs-GPQA-diamond](https://huggingface.co/datasets/FuryMartin/Ianvs-GPQA-diamond)) which is a transformed MMLU-5-shot dataset formatted to fit Ianvs's requirements. +Here, we provide `MMLU-5-shot` dataset and `GPQA-diamond` dataset for testing. The following instruction for dataset preparation for `MMLU-5-shot`, `GPQA-diamond` follows the same progress. -2. Create a `dataset` folder in the root directory of Ianvs and move `mmlu-5-shot` into the `dataset` folder. +1. Download `mmlu-5-shot` in the root directory of ianvs from [Ianvs-MMLU-5-shot](https://www.kaggle.com/datasets/kubeedgeianvs/ianvs-mmlu-5shot), which is a transformed MMLU-5-shot dataset formatted to fit Ianvs's requirements. +**Note**: To enable Docker to download datasets from Kaggle within your docker container, you need to configure the Kaggle CLI authentication token. Please follow the [official Kaggle API documentation](https://www.kaggle.com/docs/api#:~:text=is%20%24PYTHON_HOME/Scripts.-,Authentication,-In%20order%20to) to download your `kaggle.json` token. +```bash +kaggle datasets download -d kubeedgeianvs/ianvs-mmlu-5shot +unzip -o ianvs-mmlu-5shot.zip +rm -rf ianvs-mmlu-5shot.zip +``` -3. Then, check the path of `train_data` and `test_dat` in +2. Then, check the path of `train_data` and `test_data` in `examples/cloud-edge-collaborative-inference-for-llm/testenv/testenv.yaml`. - If you created the `dataset` folder inside `ianvs/` as mentioned earlier, then the relative path is correct and does not need to be modified. - If your `dataset` is created in a different location, please use an absolute path, and using `~` to represent the home directory is not supported. -#### Dataset Details +###### Dataset Details If you want to construct your own dataset, please see the details below and follow the instruction. @@ -146,7 +241,7 @@ Here is an example: } ``` -### Metric Configuration +##### Metric Configuration *Note: If you just want to run this example quickly, you can skip this step.* @@ -168,7 +263,7 @@ Each metric is calculated by a module in `examples/cloud-edge-collaborative-infe You can select multiple metrics in `examples/cloud-edge-collaborative-inference-for-llm/testenv/testenv.yaml`. -### Model Configuration +##### Model Configuration *Note: If you just want to run this example quickly, you can skip this step.* @@ -176,7 +271,7 @@ The models are configured in `examples/cloud-edge-collaborative-inference-for-ll In the configuration file, there are two models available for configuration: `EdgeModel` and `CloudModel`. -#### EdgeModel Configuration +##### EdgeModel Configuration The `EdgeModel` is designed to be deployed on your local machine, offering support for multiple serving backends including `huggingface`, `vllm`, `EagleSpecDec`. Additionally, it provides the flexibility to integrate with API-based model services. @@ -194,7 +289,7 @@ For both `EdgeModel`, the arguments are: | gpu_memory_utilization | float | The percentage of GPU memory utilization (Used for vLLM) | 0.9 | | draft_model | str | The draft model used for Speculative Decoding | - | -#### CloudModel Configuration +##### CloudModel Configuration The `CloudModel` represents the model on cloud, it will call LLM API via OpenAI API format. You need to set your OPENAI_BASE_URL and OPENAI_API_KEY in the environment variables yourself, for example. @@ -204,6 +299,8 @@ export OPENAI_BASE_URL="https://api.openai.com/v1" export OPENAI_API_KEY=sk_xxxxxxxx ``` +`Alternatively, for GROQ, use GROQ_BASE_URL and GROQ_API_KEY.` + For `CloudModel`, the open parameters are: | Parameter Name | Type | Description | Defalut | @@ -214,8 +311,7 @@ For `CloudModel`, the open parameters are: | max_tokens | int | The maximum number of tokens that can be generated in the chat completion | 512 | | repetition_penalty | float | The parameter for repetition penalty | 1.05 | - -#### Router Configuration +##### Router Configuration Router is a component that routes the query to the edge or cloud model. The router is configured by `hard_example_mining` in `examples/cloud-edge-collaborative-inference-for-llm/testrouters/query-routing/test_queryrouting.yaml`. @@ -231,9 +327,9 @@ Currently, supported routers include: You can modify the `router` parameter in `test_queryrouting.yaml` to select the router you want to use. -For BERT router, you can use [routellm/bert](https://huggingface.co/routellm/bert) or [routellm/bert_mmlu_augmented](https://huggingface.co/routellm/bert_mmlu_augmented) or your own BERT model/ +For BERT router, you can use [routellm/bert](https://huggingface.co/routellm/bert) or [routellm/bert_mmlu_augmented](https://huggingface.co/routellm/bert_mmlu_augmented) or your own BERT model. -#### Data Processor Configuration +##### Data Processor Configuration The Data Processor allows you to custom your own data format after the dataset loaded. Currently, supported routers include: @@ -242,16 +338,17 @@ Currently, supported routers include: | :---: | :---: | :---: | | OracleRouterDatasetProcessor | Expose `gold` label to OracleRouter | - | -## Step 3. Run Ianvs +#### Step 3. Run Ianvs -### Provided Response Cache +##### Provided Response Cache The testing process may take much time, depending on the number of test cases and the inference speed of the model. To enable you directly get the results, here we provide a workspace folder with cached results of `Qwen/Qwen2.5-1.5B-Instruct`, `Qwen/Qwen2.5-3B-Instruct`,`Qwen/Qwen2.5-7B-Instruct` and `gpt-4o-mini`. -You can download `workspace-mmlu` folder from [Ianvs-MMLU-5-shot](https://huggingface.co/datasets/FuryMartin/Ianvs-MMLU-5-shot) and put it under your `ianvs` folder. +You can download `workspace-mmlu` folder from [Ianvs-MMLU-5-shot](https://www.kaggle.com/datasets/kubeedgeianvs/ianvs-mmlu-5shot) and put it under your `ianvs` folder. +- Since we have already downloaded the `Ianvs-MMLU-5-shot` folder. There is no need to do this again. -### Run Joint Inference example +##### Run Joint Inference example Run the following command: @@ -260,14 +357,30 @@ Run the following command: After the process finished, you will see output like this: ```bash -[2024-10-28 18:03:37,314] edge_model.py(43) [INFO] - {'model': 'Qwen/Qwen2.5-1.5B-Instruct', 'backend': 'vllm', 'temperature': 0, 'top_p': 0.8, 'max_tokens': 512, 'repetition_penalty': 1.05, 'tensor_parallel_size': 4, 'gpu_memory_utilization': 0.9, 'use_cache': True} -[2024-10-28 18:03:37,314] cloud_model.py(34) [INFO] - {'model': 'gpt-4o-mini', 'temperature': 0, 'top_p': 0.8, 'max_tokens': 512, 'repetition_penalty': 1.05, 'use_cache': True} -[2024-10-28 18:03:37,850] joint_inference.py(73) [INFO] - Loading dataset -[2024-10-28 18:03:38,703] hard_sample_mining.py(30) [INFO] - USING EdgeOnlyFilter -[2024-10-28 18:03:38,704] joint_inference.py(162) [INFO] - Inference Start -100%|██████████████████████████████████| 14042/14042 [00:02<00:00, 6182.92it/s, Edge=14042, Cloud=0] -[2024-10-28 18:03:40,975] joint_inference.py(186) [INFO] - Inference Finished -[2024-10-28 18:03:40,976] joint_inference.py(131) [INFO] - Release models +[2025-04-12 09:20:14,523] edge_model.py(43) [INFO] - {'model': 'Qwen/Qwen2.5-1.5B-Instruct', 'backend': 'vllm', 'temperature': 0, 'top_p': 0.8, 'max_tokens': 512, 'repetition_penalty': 1.05, 'tensor_parallel_size': 4, 'gpu_memory_utilization': 0.9, 'use_cache': True} +[2025-04-12 09:20:14,524] cloud_model.py(34) [INFO] - {'model': 'gpt-4o-mini', 'temperature': 0, 'top_p': 0.8, 'max_tokens': 512, 'repetition_penalty': 1.05, 'use_cache': True} +[2025-04-12 09:20:14,880] joint_inference.py(73) [INFO] - Loading dataset +[2025-04-12 09:20:15,943] hard_sample_mining.py(30) [INFO] - USING EdgeOnlyFilter +[2025-04-12 09:20:15,943] joint_inference.py(162) [INFO] - Inference Start +100%|██████████████████████████████████| 14042/14042 [00:03<00:00, 4418.66it/s, Edge=14042, Cloud=0] +[2025-04-12 09:20:19,122] joint_inference.py(186) [INFO] - Inference Finished +[2025-04-12 09:20:19,122] joint_inference.py(131) [INFO] - Release models +[2025-04-12 09:20:23,844] edge_model.py(43) [INFO] - {'model': 'Qwen/Qwen2.5-3B-Instruct', 'backend': 'vllm', 'temperature': 0, 'top_p': 0.8, 'max_tokens': 512, 'repetition_penalty': 1.05, 'tensor_parallel_size': 4, 'gpu_memory_utilization': 0.9, 'use_cache': True} +[2025-04-12 09:20:23,844] cloud_model.py(34) [INFO] - {'model': 'gpt-4o-mini', 'temperature': 0, 'top_p': 0.8, 'max_tokens': 512, 'repetition_penalty': 1.05, 'use_cache': True} +[2025-04-12 09:20:23,851] joint_inference.py(73) [INFO] - Loading dataset +[2025-04-12 09:20:24,845] hard_sample_mining.py(30) [INFO] - USING EdgeOnlyFilter +[2025-04-12 09:20:24,845] joint_inference.py(162) [INFO] - Inference Start +100%|██████████████████████████████████| 14042/14042 [00:03<00:00, 4413.68it/s, Edge=14042, Cloud=0] +[2025-04-12 09:20:28,027] joint_inference.py(186) [INFO] - Inference Finished +[2025-04-12 09:20:28,027] joint_inference.py(131) [INFO] - Release models +[2025-04-12 09:20:32,741] edge_model.py(43) [INFO] - {'model': 'Qwen/Qwen2.5-7B-Instruct', 'backend': 'vllm', 'temperature': 0, 'top_p': 0.8, 'max_tokens': 512, 'repetition_penalty': 1.05, 'tensor_parallel_size': 4, 'gpu_memory_utilization': 0.9, 'use_cache': True} +[2025-04-12 09:20:32,741] cloud_model.py(34) [INFO] - {'model': 'gpt-4o-mini', 'temperature': 0, 'top_p': 0.8, 'max_tokens': 512, 'repetition_penalty': 1.05, 'use_cache': True} +[2025-04-12 09:20:32,749] joint_inference.py(73) [INFO] - Loading dataset +[2025-04-12 09:20:33,738] hard_sample_mining.py(30) [INFO] - USING EdgeOnlyFilter +[2025-04-12 09:20:33,738] joint_inference.py(162) [INFO] - Inference Start +100%|██████████████████████████████████| 14042/14042 [00:03<00:00, 4456.34it/s, Edge=14042, Cloud=0] +[2025-04-12 09:20:36,890] joint_inference.py(186) [INFO] - Inference Finished +[2025-04-12 09:20:36,890] joint_inference.py(131) [INFO] - Release models ``` ### Results @@ -331,7 +444,6 @@ You can modify and run `performance-cost-plot.py` to get your Performance-Cost f Some related research $^{[1]}$ has trained pratical routers that can save up to 40% of GPT-4 API calls while maintaining essentially unchanged accuracy on the test set. - ## Future This example builds an architecture for testing query routing strategies, but the provided dataset has some drawbacks such as being one-sided and singular, making it difficult to reflect effects in real-world scenarios. @@ -343,9 +455,7 @@ Thus, the future tasks of this example include: - Build a more comprehensive dataset for better router evaluation - Try to consider a native Speculative Decoding in cloud-edge collaborative inference scenario. - - -**Reference** +## References [1] Ding, Dujian, et al. "Hybrid LLM: Cost-efficient and quality-aware query routing." *arXiv preprint arXiv:2404.14618* (2024). diff --git a/examples/cloud-edge-collaborative-inference-for-llm/requirements.txt b/examples/cloud-edge-collaborative-inference-for-llm/requirements.txt index 58fc617d..c5fafad1 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/requirements.txt +++ b/examples/cloud-edge-collaborative-inference-for-llm/requirements.txt @@ -2,4 +2,6 @@ vllm transformers openai accelerate -datamodel_code_generator \ No newline at end of file +datamodel_code_generator +kaggle +groq \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/cloud_model.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/cloud_model.py index f466b367..f9c3243d 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/cloud_model.py +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/cloud_model.py @@ -31,9 +31,18 @@ class CloudModel: def __init__(self, **kwargs): """Initialize the CloudModel. See `APIBasedLLM` for details about `kwargs`. """ - LOGGER.info(kwargs) - self.model = APIBasedLLM(**kwargs) - self.load(kwargs.get("model", "gpt-4o-mini")) + LOGGER.info("Initializing CloudModel with kwargs: %s", kwargs) + try: + self.model = APIBasedLLM(**kwargs) + except Exception as e: + LOGGER.error("Failed to initialize APIBasedLLM: %s", str(e)) + raise RuntimeError("Could not initialize APIBasedLLM. Check your credentials or configuration.") from e + model_name = kwargs.get("model", "").strip() + if not model_name: + LOGGER.warning("No 'model' specified in kwargs. Falling back to default 'gpt-4o-mini'.") + model_name = "gpt-4o-mini" + + self.load(model_name) def load(self, model): """Set the model. @@ -43,7 +52,15 @@ def load(self, model): model : str Existing model from your OpenAI provider. Example: `gpt-4o-mini` """ - self.model._load(model = model) + if not model or not isinstance(model, str): + raise ValueError("Model name must be a non-empty string.") + + try: + self.model._load(model=model) + LOGGER.info("Model '%s' loaded successfully.", model) + except Exception as e: + LOGGER.error("Error loading model '%s': %s", model, str(e)) + raise RuntimeError(f"Failed to load model '{model}'.") from e def inference(self, data, **kwargs): """Inference the model with the given data. @@ -60,12 +77,21 @@ def inference(self, data, **kwargs): dict Formatted Response. See `model._format_response()` for more details. """ + if not isinstance(data, dict): + raise ValueError("Input data for inference must be a dictionary.") - return self.model.inference(data) + try: + return self.model.inference(data) + except Exception as e: + LOGGER.error("Inference failed: %s", str(e)) + raise RuntimeError("Inference failed. Check input data format and model readiness.") from e def cleanup(self): """Save the cache and cleanup the model. """ - - self.model.save_cache() - self.model.cleanup() \ No newline at end of file + try: + self.model.save_cache() + self.model.cleanup() + LOGGER.info("Cleanup completed successfully.") + except Exception as e: + LOGGER.warning("Cleanup encountered an issue: %s", str(e)) \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/data_processor.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/data_processor.py index ee29ed40..0e487e0b 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/data_processor.py +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/data_processor.py @@ -35,5 +35,9 @@ def __call__(self, dataset): sedna.datasources.BaseDataSource Transformed dataset """ - dataset.x = [{"query": x, "gold": y} for x,y in zip(dataset.x, dataset.y)] + try: + dataset.x = [{"query": x, "gold": y} for x, y in zip(dataset.x, dataset.y)] + except Exception as e: + raise RuntimeError("Failed to transform dataset for Oracle Router.") from e + return dataset \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/edge_model.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/edge_model.py index b830d803..d62b4973 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/edge_model.py +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/edge_model.py @@ -40,16 +40,20 @@ def __init__(self, **kwargs): - `backend`: str, default "huggingface". The serving framework to be used. """ - LOGGER.info(kwargs) + LOGGER.info("Initializing EdgeModel with kwargs: %s", kwargs) self.kwargs = kwargs self.model_name = kwargs.get("model", None) self.backend = kwargs.get("backend", "huggingface") + if self.backend not in ["huggingface", "vllm", "api"]: + raise ValueError( + f"Unsupported backend: {self.backend}. Supported options are: 'huggingface', 'vllm', 'api'." + ) self._set_config() def _set_config(self): """Set the model path in our environment variables due to Sedna’s [check](https://github.com/kubeedge/sedna/blob/ac623ab32dc37caa04b9e8480dbe1a8c41c4a6c2/lib/sedna/core/base.py#L132). """ - # + os.environ["model_path"] = self.model_name def load(self, **kwargs): @@ -60,18 +64,20 @@ def load(self, **kwargs): Exception When the backend is not supported. """ - if self.backend == "huggingface": - self.model = HuggingfaceLLM(**self.kwargs) - elif self.backend == "vllm": - self.model = VllmLLM(**self.kwargs) - elif self.backend == "api": - self.model = APIBasedLLM(**self.kwargs) - elif self.backend == "EagleSpecDec": - self.model = EagleSpecDecModel(**self.kwargs) - elif self.backend == "LadeSpecDec": - self.model = LadeSpecDecLLM(**self.kwargs) - else: - raise Exception(f"Backend {self.backend} is not supported. Please use 'huggingface', 'vllm', or `api` ") + try: + if self.backend == "huggingface": + self.model = HuggingfaceLLM(**self.kwargs) + elif self.backend == "vllm": + self.model = VllmLLM(**self.kwargs) + elif self.backend == "api": + self.model = APIBasedLLM(**self.kwargs) + elif self.backend == "EagleSpecDec": + self.model = EagleSpecDecModel(**self.kwargs) + elif self.backend == "LadeSpecDec": + self.model = LadeSpecDecLLM(**self.kwargs) + except Exception as e: + LOGGER.error(f"Failed to initialize model backend `{self.backend}`: {str(e)}") + raise RuntimeError(f"Model loading failed for backend `{self.backend}`.") from e def predict(self, data, **kwargs): """Inference the model with the given data. @@ -89,13 +95,18 @@ def predict(self, data, **kwargs): Formatted Response. See `model._format_response()` for more details. """ - answer = self.model.inference(data) - - return answer - + try: + return self.model.inference(data) + except Exception as e: + LOGGER.error(f"Inference failed: {e}") + raise RuntimeError("Inference failed due to an internal error.") from e + def cleanup(self): """Save the cache and cleanup the model. """ - self.model.save_cache() - self.model.cleanup() + try: + self.model.save_cache() + self.model.cleanup() + except Exception as e: + LOGGER.warning(f"Cleanup failed: {e}") diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/hard_sample_mining.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/hard_sample_mining.py index 469c9f3e..4bb09a6a 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/hard_sample_mining.py +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/hard_sample_mining.py @@ -15,13 +15,16 @@ """Hard Example Mining Algorithms""" import abc +import torch import random from transformers import pipeline from sedna.common.class_factory import ClassFactory, ClassType from core.common.log import LOGGER __all__ = ('BERTFilter', 'EdgeOnlyFilter', 'CloudOnlyFilter', - 'RandomRouterFilter', 'OracleRouterFilter') + 'RandomRouterFilter', 'OracleRouterFilter', 'ResourceSensitiveRouterFilter') + +device = "cuda" if torch.cuda.is_available() else "cpu" class BaseFilter(metaclass=abc.ABCMeta): """The base class to define unified interface.""" @@ -73,7 +76,11 @@ def __init__(self, **kwargs): self.task = kwargs.get("task", "text-classification") self.max_length = kwargs.get("max_length", 512) - self.classifier = pipeline(self.task, model=self.model, device="cuda") + try: + self.classifier = pipeline(self.task, model=self.model, device=device) + except Exception as e: + LOGGER.error(f"Failed to initialize the pipeline: {e}") + raise RuntimeError("Pipeline initialization failed. Please check the model and task parameters.") def _text_classification_postprocess(self, result): """Postprocess the text classification result @@ -182,7 +189,10 @@ class RandomRouterFilter(BaseFilter, abc.ABC): """ def __init__(self, **kwargs): super().__init__(**kwargs) - self.threshold = kwargs.get("threshold", 0) + self.threshold = kwargs.get("threshold", 0.5) + if not (0 <= self.threshold <= 1): + LOGGER.error("Threshold must be between 0 and 1. Defaulting to 0.5.") + self.threshold = 0.5 def __call__(self, data=None) -> bool: return False if random.random() < self.threshold else True @@ -200,7 +210,10 @@ def __init__(self, **kwargs): self.edge_model = kwargs.get("edgemodel") self.cloud_model = kwargs.get("cloudmodel") - + if not self.edge_model or not self.cloud_model: + LOGGER.error("Both edge and cloud models must be provided.") + raise ValueError("Edge and cloud models are required for OracleRouterFilter.") + def __call__(self, data=None): """Route the query to edge or cloud based on the models' prediction. @@ -251,3 +264,51 @@ def cleanup(self): f"Cloud Better: {self.cloud_better}" ] LOGGER.info("".join(message)) + +@ClassFactory.register(ClassType.HEM, alias="ResourceSensitiveRouter") +class ResourceSensitiveRouterFilter(BaseFilter, abc.ABC): + """ + A resource-aware router that adapts routing based on real-time edge device constraints. + Routes to cloud if edge device is under resource pressure; otherwise, processes locally. + """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + + # Thresholds can be adjusted based on empirical device behavior + self.temperature_threshold = kwargs.get("temperature_threshold", 75) # in °C + self.battery_threshold = kwargs.get("battery_threshold", 20) # in % + self.cpu_threshold = kwargs.get("cpu_threshold", 85) # in % + self.memory_threshold = kwargs.get("memory_threshold", 85) # in % + + # These can be real checks in production; here we simulate/mock for demonstration + self.resource_monitor = kwargs.get("resource_monitor", self.mock_resource_monitor) + + def __call__(self, data=None) -> bool: + """ + Route based on resource constraints. + Returns True for hard sample (go to cloud), False for easy sample (stay on edge). + """ + resources = self.resource_monitor() + + is_overloaded = ( + resources["temperature"] > self.temperature_threshold or + resources["battery"] < self.battery_threshold or + resources["cpu"] > self.cpu_threshold or + resources["memory"] > self.memory_threshold + ) + + if is_overloaded: + LOGGER.info("Routing to cloud due to resource constraints.") + else: + LOGGER.info("Sufficient edge resources, processing locally.") + + return is_overloaded # True means cloud (hard), False means edge (easy) + + def mock_resource_monitor(self): + """Mock resource monitor that simulates device conditions.""" + return { + "temperature": random.uniform(40, 140), + "battery": random.uniform(5, 100), + "cpu": random.uniform(10, 100), + "memory": random.uniform(10, 100) + } \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/api_llm.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/api_llm.py index 9743cef0..8eeafd64 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/api_llm.py +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/api_llm.py @@ -16,6 +16,7 @@ import time from openai import OpenAI +from groq import Groq from models.base_llm import BaseLLM from retry import retry @@ -24,14 +25,31 @@ def __init__(self, **kwargs) -> None: """ Initialize the APIBasedLLM class """ BaseLLM.__init__(self, **kwargs) - - api_key=os.environ.get("OPENAI_API_KEY") - base_url=os.environ.get("OPENAI_BASE_URL") - - self.client = OpenAI( - api_key=api_key, - base_url=base_url - ) + + self.provider = kwargs.get("api_provider", "openai").lower() + self.api_key_env = kwargs.get("api_key_env", "OPENAI_API_KEY") + self.api_base_url = kwargs.get("api_base_url", "OPENAI_BASE_URL") + + api_key = os.environ.get(self.api_key_env) + base_url = os.environ.get(self.api_base_url) + + if not api_key: + raise ValueError(f"API key not found in environment variable: {self.api_key_env}") + if not base_url: + raise ValueError(f"Base URL not found in environment variable: {self.api_base_url}") + + if self.provider == "groq": + try: + self.client = Groq(api_key=api_key, base_url=base_url) + except Exception as e: + raise RuntimeError(f"Failed to initialize Groq client: {e}") + elif self.provider == "openai": + try: + self.client = OpenAI(api_key=api_key, base_url=base_url) + except Exception as e: + raise RuntimeError(f"Failed to initialize OpenAI client: {e}") + else: + raise ValueError(f"Unsupported provider: {self.provider}") def _load(self, model): """Set the model to be used. @@ -67,35 +85,61 @@ def _infer(self, messages): st = time.perf_counter() most_recent_timestamp = st generated_text = "" + try: + if self.provider == "openai": + stream = self.client.chat.completions.create( + messages=messages, + model=self.model, + temperature=self.temperature, + max_tokens=self.max_tokens, + top_p=self.top_p, + frequency_penalty=self.repetition_penalty, + stream=True, + stream_options={"include_usage": True} + ) + elif self.provider == "groq": + stream = self.client.chat.completions.create( + messages=messages, + model=self.model, + temperature=self.temperature, + max_tokens=self.max_tokens, + top_p=self.top_p, + frequency_penalty=self.repetition_penalty, + stream=True + ) + else: + raise ValueError(f"Unsupported provider: {self.provider}") + + for chunk in stream: + timestamp = time.perf_counter() + if time_to_first_token == 0.0: + time_to_first_token = timestamp - st + else: + internal_token_latency.append(timestamp - most_recent_timestamp) + most_recent_timestamp = timestamp + + if chunk.choices: + generated_text += chunk.choices[0].delta.content or "" + if self.provider == "openai" and chunk.usage: + usage = chunk.usage + + text = generated_text + if self.provider == "openai": + prompt_tokens = usage.prompt_tokens + completion_tokens = usage.completion_tokens + else: + prompt_tokens = len(messages[0]['content'].split()) # Approximate + completion_tokens = len(text.split()) # Approximate - stream = self.client.chat.completions.create( - messages = messages, - model=self.model, - temperature=self.temperature, - max_tokens=self.max_tokens, - top_p=self.top_p, - frequency_penalty=self.repetition_penalty, - stream=True, - stream_options={"include_usage":True} - ) - - for chunk in stream: - timestamp = time.perf_counter() - if time_to_first_token == 0.0: - time_to_first_token = time.perf_counter() - st + if internal_token_latency: + internal_token_latency = sum(internal_token_latency) / len(internal_token_latency) + throughput = 1 / internal_token_latency else: - internal_token_latency.append(timestamp - most_recent_timestamp) - most_recent_timestamp = timestamp - if chunk.choices: - generated_text += chunk.choices[0].delta.content or "" - if chunk.usage: - usage = chunk.usage - - text = generated_text - prompt_tokens = usage.prompt_tokens - completion_tokens = usage.completion_tokens - internal_token_latency = sum(internal_token_latency) / len(internal_token_latency) - throughput = 1 / internal_token_latency + internal_token_latency = 0 + throughput = 0 + + except Exception as e: + raise RuntimeError(f"Error during API inference: {e}") response = self._format_response( text, @@ -106,4 +150,4 @@ def _infer(self, messages): throughput ) - return response + return response \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/base_llm.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/base_llm.py index 416bb46a..7ce8e765 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/base_llm.py +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/base_llm.py @@ -318,4 +318,4 @@ def save_cache(self): def cleanup(self): """Default Cleanup Method to release resources """ - pass + pass \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/huggingface_llm.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/huggingface_llm.py index 0d3f3b6a..f39860ae 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/huggingface_llm.py +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/huggingface_llm.py @@ -14,12 +14,13 @@ import os import time +import torch from threading import Thread - +from core.common.log import LOGGER from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer from models.base_llm import BaseLLM -device = "cuda" +device = "cuda" if torch.cuda.is_available() else "cpu" os.environ["TOKENIZERS_PARALLELISM"] = "true" class HuggingfaceLLM(BaseLLM): @@ -41,16 +42,19 @@ def _load(self, model): model : str Hugging Face style model name. Example: `Qwen/Qwen2.5-0.5B-Instruct` """ - self.model = AutoModelForCausalLM.from_pretrained( - model, - torch_dtype="auto", - device_map="auto", - trust_remote_code=True - ) - self.tokenizer = AutoTokenizer.from_pretrained( - model, - trust_remote_code=True - ) + try: + self.model = AutoModelForCausalLM.from_pretrained( + model, + torch_dtype="auto", + device_map="auto", + trust_remote_code=True + ) + self.tokenizer = AutoTokenizer.from_pretrained( + model, + trust_remote_code=True + ) + except Exception as e: + raise RuntimeError(f"Failed to load model '{model}': {e}") def _infer(self, messages): """Call the transformers inference API to get the response @@ -72,46 +76,50 @@ def _infer(self, messages): st = time.perf_counter() most_recent_timestamp = st - # messages = self.get_message_chain(question, system_prompt) - text = self.tokenizer.apply_chat_template( - messages, - tokenize=False, - add_generation_prompt=True - ) - - model_inputs = self.tokenizer([text], return_tensors="pt").to(device) - - streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True) - - generation_kwargs = dict( - model_inputs, - streamer=streamer, - max_new_tokens=self.max_tokens, - temperature=self.temperature, - top_p=self.top_p, - repetition_penalty=self.repetition_penalty, - ) - - thread = Thread( - target=self.model.generate, - kwargs=generation_kwargs - ) - - thread.start() - time_to_first_token = 0 - internal_token_latency = [] - generated_text = "" - completion_tokens = 0 - - for chunk in streamer: - timestamp = time.perf_counter() - if time_to_first_token == 0: - time_to_first_token = time.perf_counter() - st - else: - internal_token_latency.append(timestamp - most_recent_timestamp) - most_recent_timestamp = timestamp - generated_text += chunk - completion_tokens += 1 + try: + # messages = self.get_message_chain(question, system_prompt + text = self.tokenizer.apply_chat_template( + messages, + tokenize=False, + add_generation_prompt=True + ) + + model_inputs = self.tokenizer([text], return_tensors="pt").to(device) + + streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True) + + generation_kwargs = dict( + model_inputs, + streamer=streamer, + max_new_tokens=self.max_tokens, + temperature=self.temperature, + top_p=self.top_p, + repetition_penalty=self.repetition_penalty, + ) + + thread = Thread( + target=self.model.generate, + kwargs=generation_kwargs + ) + + thread.start() + time_to_first_token = 0 + internal_token_latency = [] + generated_text = "" + completion_tokens = 0 + + for chunk in streamer: + timestamp = time.perf_counter() + if time_to_first_token == 0: + time_to_first_token = time.perf_counter() - st + else: + internal_token_latency.append(timestamp - most_recent_timestamp) + most_recent_timestamp = timestamp + generated_text += chunk + completion_tokens += 1 + + except Exception as e: + raise RuntimeError(f"Inference failed: {e}") text = generated_text.replace("<|im_end|>", "") prompt_tokens = len(model_inputs.input_ids[0]) @@ -132,4 +140,4 @@ def _infer(self, messages): if __name__ == "__main__": model = HuggingfaceLLM() model._load("Qwen/Qwen2-7B-Instruct") - print(model._infer("Hello, how are you?")) \ No newline at end of file + LOGGER.info(model._infer("Hello, how are you?")) diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/vllm_llm.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/vllm_llm.py index 5d572306..ea9839c8 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/vllm_llm.py +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/vllm_llm.py @@ -13,7 +13,7 @@ # limitations under the License. import os - +import torch from vllm import LLM, SamplingParams from vllm.distributed.parallel_state import destroy_model_parallel, destroy_distributed_environment from models.base_llm import BaseLLM @@ -22,7 +22,7 @@ os.environ["TOKENIZERS_PARALLELISM"] = "true" os.environ["VLLM_LOGGING_LEVEL"] = "ERROR" -device = "cuda" +device = "cuda" if torch.cuda.is_available() else "cpu" class VllmLLM(BaseLLM): def __init__(self, **kwargs) -> None: @@ -77,12 +77,15 @@ def warmup(self): """Warm up the Model for more accurate performance metrics """ - self.model.chat( - [{"role": "user", "content": "Hello"}], - self.sampling_params, - use_tqdm=False - ) - + try: + self.model.chat( + [{"role": "user", "content": "Hello"}], + self.sampling_params, + use_tqdm=False + ) + except Exception as e: + raise RuntimeError(f"Warmup failed: {e}") + def _infer(self, messages): """Call the vLLM Offline Inference API to get the response @@ -133,8 +136,10 @@ def _infer(self, messages): def cleanup(self): """Release the model from GPU """ - destroy_model_parallel() - destroy_distributed_environment() - - if hasattr(self, "model"): - del self.model.llm_engine.model_executor + try: + destroy_model_parallel() + destroy_distributed_environment() + if hasattr(self, "model"): + del self.model.llm_engine.model_executor + except Exception as e: + raise RuntimeError(f"Cleanup failed: {e}") \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/test_queryrouting.yaml b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/test_queryrouting.yaml index 642e3763..650f3a01 100644 --- a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/test_queryrouting.yaml +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/test_queryrouting.yaml @@ -70,9 +70,18 @@ algorithm: hyperparameters: # name of the hyperparameter; string type; + - api_provider: + values: + - "openai" - model: values: - "gpt-4o-mini" + - api_key_env: + values: + - "OPENAI_API_KEY" + - api_base_url: + values: + - "OPENAI_BASE_URL" - temperature: values: - 0.9 diff --git a/requirements.txt b/requirements.txt index 0cef1203..ab20c495 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ numpy pandas tqdm matplotlib +onnx \ No newline at end of file