Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions examples/cloud-edge-collaborative-inference-for-llm/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Use Miniconda base image
FROM continuumio/miniconda3:latest

ENV CONDA_ENV=ianvs-experiment \
PYTHON_VERSION=3.8

# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
gnupg \
git \
unzip

# Copy kaggle.json (Make sure this file is in the same directory as your Dockerfile)
COPY kaggle.json /root/.kaggle/kaggle.json
RUN chmod 600 /root/.kaggle/kaggle.json

# Clone Ianvs repo
RUN git clone https://github.com/kubeedge/ianvs.git
WORKDIR /ianvs

# Create conda environment with Python and Rust
RUN conda create -y -n $CONDA_ENV python=$PYTHON_VERSION rust -c conda-forge

# Install dependencies inside the conda environment and Ianvs
RUN /bin/bash -c "source activate $CONDA_ENV && \
pip install examples/resources/third_party/sedna-0.6.0.1-py3-none-any.whl && \
pip install -r requirements.txt && \
pip install -r examples/cloud-edge-collaborative-inference-for-llm/requirements.txt && \
python setup.py install"

# Download Kaggle CLI
RUN pip install kaggle

# Download dataset
RUN cd /ianvs && \
kaggle datasets download -d kubeedgeianvs/ianvs-mmlu-5shot && \
kaggle datasets download -d kubeedgeianvs/ianvs-gpqa-diamond && \
unzip -o ianvs-mmlu-5shot.zip && \
unzip -o ianvs-gpqa-diamond.zip && \
rm -rf ianvs-mmlu-5shot.zip && \
rm -rf ianvs-gpqa-diamond.zip

# Set final working directory
WORKDIR /ianvs
198 changes: 154 additions & 44 deletions examples/cloud-edge-collaborative-inference-for-llm/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ vllm
transformers
openai
accelerate
datamodel_code_generator
datamodel_code_generator
kaggle
groq
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@ class CloudModel:
def __init__(self, **kwargs):
"""Initialize the CloudModel. See `APIBasedLLM` for details about `kwargs`.
"""
LOGGER.info(kwargs)
self.model = APIBasedLLM(**kwargs)
self.load(kwargs.get("model", "gpt-4o-mini"))
LOGGER.info("Initializing CloudModel with kwargs: %s", kwargs)
try:
self.model = APIBasedLLM(**kwargs)
except Exception as e:
LOGGER.error("Failed to initialize APIBasedLLM: %s", str(e))
raise RuntimeError("Could not initialize APIBasedLLM. Check your credentials or configuration.") from e
model_name = kwargs.get("model", "").strip()
if not model_name:
LOGGER.warning("No 'model' specified in kwargs. Falling back to default 'gpt-4o-mini'.")
model_name = "gpt-4o-mini"

self.load(model_name)

def load(self, model):
"""Set the model.
Expand All @@ -43,7 +52,15 @@ def load(self, model):
model : str
Existing model from your OpenAI provider. Example: `gpt-4o-mini`
"""
self.model._load(model = model)
if not model or not isinstance(model, str):
raise ValueError("Model name must be a non-empty string.")

try:
self.model._load(model=model)
LOGGER.info("Model '%s' loaded successfully.", model)
except Exception as e:
LOGGER.error("Error loading model '%s': %s", model, str(e))
raise RuntimeError(f"Failed to load model '{model}'.") from e

def inference(self, data, **kwargs):
"""Inference the model with the given data.
Expand All @@ -60,12 +77,21 @@ def inference(self, data, **kwargs):
dict
Formatted Response. See `model._format_response()` for more details.
"""
if not isinstance(data, dict):
raise ValueError("Input data for inference must be a dictionary.")

return self.model.inference(data)
try:
return self.model.inference(data)
except Exception as e:
LOGGER.error("Inference failed: %s", str(e))
raise RuntimeError("Inference failed. Check input data format and model readiness.") from e

def cleanup(self):
"""Save the cache and cleanup the model.
"""

self.model.save_cache()
self.model.cleanup()
try:
self.model.save_cache()
self.model.cleanup()
LOGGER.info("Cleanup completed successfully.")
except Exception as e:
LOGGER.warning("Cleanup encountered an issue: %s", str(e))
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,9 @@ def __call__(self, dataset):
sedna.datasources.BaseDataSource
Transformed dataset
"""
dataset.x = [{"query": x, "gold": y} for x,y in zip(dataset.x, dataset.y)]
try:
dataset.x = [{"query": x, "gold": y} for x, y in zip(dataset.x, dataset.y)]
except Exception as e:
raise RuntimeError("Failed to transform dataset for Oracle Router.") from e

return dataset
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@ def __init__(self, **kwargs):
- `backend`: str, default "huggingface". The serving framework to be used.
"""

LOGGER.info(kwargs)
LOGGER.info("Initializing EdgeModel with kwargs: %s", kwargs)
self.kwargs = kwargs
self.model_name = kwargs.get("model", None)
self.backend = kwargs.get("backend", "huggingface")
if self.backend not in ["huggingface", "vllm", "api"]:
raise ValueError(
f"Unsupported backend: {self.backend}. Supported options are: 'huggingface', 'vllm', 'api'."
)
self._set_config()

def _set_config(self):
"""Set the model path in our environment variables due to Sedna’s [check](https://github.com/kubeedge/sedna/blob/ac623ab32dc37caa04b9e8480dbe1a8c41c4a6c2/lib/sedna/core/base.py#L132).
"""
#

os.environ["model_path"] = self.model_name

def load(self, **kwargs):
Expand All @@ -60,18 +64,20 @@ def load(self, **kwargs):
Exception
When the backend is not supported.
"""
if self.backend == "huggingface":
self.model = HuggingfaceLLM(**self.kwargs)
elif self.backend == "vllm":
self.model = VllmLLM(**self.kwargs)
elif self.backend == "api":
self.model = APIBasedLLM(**self.kwargs)
elif self.backend == "EagleSpecDec":
self.model = EagleSpecDecModel(**self.kwargs)
elif self.backend == "LadeSpecDec":
self.model = LadeSpecDecLLM(**self.kwargs)
else:
raise Exception(f"Backend {self.backend} is not supported. Please use 'huggingface', 'vllm', or `api` ")
try:
if self.backend == "huggingface":
self.model = HuggingfaceLLM(**self.kwargs)
elif self.backend == "vllm":
self.model = VllmLLM(**self.kwargs)
elif self.backend == "api":
self.model = APIBasedLLM(**self.kwargs)
elif self.backend == "EagleSpecDec":
self.model = EagleSpecDecModel(**self.kwargs)
elif self.backend == "LadeSpecDec":
self.model = LadeSpecDecLLM(**self.kwargs)
except Exception as e:
LOGGER.error(f"Failed to initialize model backend `{self.backend}`: {str(e)}")
raise RuntimeError(f"Model loading failed for backend `{self.backend}`.") from e

def predict(self, data, **kwargs):
"""Inference the model with the given data.
Expand All @@ -89,13 +95,18 @@ def predict(self, data, **kwargs):
Formatted Response. See `model._format_response()` for more details.
"""

answer = self.model.inference(data)

return answer

try:
return self.model.inference(data)
except Exception as e:
LOGGER.error(f"Inference failed: {e}")
raise RuntimeError("Inference failed due to an internal error.") from e

def cleanup(self):
"""Save the cache and cleanup the model.
"""

self.model.save_cache()
self.model.cleanup()
try:
self.model.save_cache()
self.model.cleanup()
except Exception as e:
LOGGER.warning(f"Cleanup failed: {e}")
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
"""Hard Example Mining Algorithms"""

import abc
import torch
import random
from transformers import pipeline
from sedna.common.class_factory import ClassFactory, ClassType
from core.common.log import LOGGER

__all__ = ('BERTFilter', 'EdgeOnlyFilter', 'CloudOnlyFilter',
'RandomRouterFilter', 'OracleRouterFilter')
'RandomRouterFilter', 'OracleRouterFilter', 'ResourceSensitiveRouterFilter')

device = "cuda" if torch.cuda.is_available() else "cpu"

class BaseFilter(metaclass=abc.ABCMeta):
"""The base class to define unified interface."""
Expand Down Expand Up @@ -73,7 +76,11 @@ def __init__(self, **kwargs):
self.task = kwargs.get("task", "text-classification")
self.max_length = kwargs.get("max_length", 512)

self.classifier = pipeline(self.task, model=self.model, device="cuda")
try:
self.classifier = pipeline(self.task, model=self.model, device=device)
except Exception as e:
LOGGER.error(f"Failed to initialize the pipeline: {e}")
raise RuntimeError("Pipeline initialization failed. Please check the model and task parameters.")

def _text_classification_postprocess(self, result):
"""Postprocess the text classification result
Expand Down Expand Up @@ -182,7 +189,10 @@ class RandomRouterFilter(BaseFilter, abc.ABC):
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.threshold = kwargs.get("threshold", 0)
self.threshold = kwargs.get("threshold", 0.5)
if not (0 <= self.threshold <= 1):
LOGGER.error("Threshold must be between 0 and 1. Defaulting to 0.5.")
self.threshold = 0.5

def __call__(self, data=None) -> bool:
return False if random.random() < self.threshold else True
Expand All @@ -200,7 +210,10 @@ def __init__(self, **kwargs):

self.edge_model = kwargs.get("edgemodel")
self.cloud_model = kwargs.get("cloudmodel")

if not self.edge_model or not self.cloud_model:
LOGGER.error("Both edge and cloud models must be provided.")
raise ValueError("Edge and cloud models are required for OracleRouterFilter.")

def __call__(self, data=None):
"""Route the query to edge or cloud based on the models' prediction.

Expand Down Expand Up @@ -251,3 +264,51 @@ def cleanup(self):
f"Cloud Better: {self.cloud_better}"
]
LOGGER.info("".join(message))

@ClassFactory.register(ClassType.HEM, alias="ResourceSensitiveRouter")
class ResourceSensitiveRouterFilter(BaseFilter, abc.ABC):
"""
A resource-aware router that adapts routing based on real-time edge device constraints.
Routes to cloud if edge device is under resource pressure; otherwise, processes locally.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)

# Thresholds can be adjusted based on empirical device behavior
self.temperature_threshold = kwargs.get("temperature_threshold", 75) # in °C
self.battery_threshold = kwargs.get("battery_threshold", 20) # in %
self.cpu_threshold = kwargs.get("cpu_threshold", 85) # in %
self.memory_threshold = kwargs.get("memory_threshold", 85) # in %

# These can be real checks in production; here we simulate/mock for demonstration
self.resource_monitor = kwargs.get("resource_monitor", self.mock_resource_monitor)

def __call__(self, data=None) -> bool:
"""
Route based on resource constraints.
Returns True for hard sample (go to cloud), False for easy sample (stay on edge).
"""
resources = self.resource_monitor()

is_overloaded = (
resources["temperature"] > self.temperature_threshold or
resources["battery"] < self.battery_threshold or
resources["cpu"] > self.cpu_threshold or
resources["memory"] > self.memory_threshold
)

if is_overloaded:
LOGGER.info("Routing to cloud due to resource constraints.")
else:
LOGGER.info("Sufficient edge resources, processing locally.")

return is_overloaded # True means cloud (hard), False means edge (easy)

def mock_resource_monitor(self):
"""Mock resource monitor that simulates device conditions."""
return {
"temperature": random.uniform(40, 140),
"battery": random.uniform(5, 100),
"cpu": random.uniform(10, 100),
"memory": random.uniform(10, 100)
}
Loading
Loading