From bc915db2cb2f848a670c273a880cb929c4213ddd Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Wed, 11 Dec 2024 06:46:26 +0000 Subject: [PATCH 1/9] readme updates --- README.md | 56 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 98187530..822cacad 100644 --- a/README.md +++ b/README.md @@ -18,20 +18,42 @@

This repository contains the code for the Eureka ML Insights framework. The framework is designed to help researchers and practitioners run reproducible evaluations of generative models using a variety of benchmarks and metrics efficiently. The framework allows the user to define custom pipelines for data processing, inference, and evaluation, and provides a set of pre-defined evaluation pipelines for key benchmarks. +## Table of Contents +- [Eureka ML Insights Framework](#eureka-ml-insights-framework) + - [Table of Contents](#table-of-contents) + - [Benchmarks](#benchmarks) + - [Installation](#installation) + - [📦 Installing with pip + editable for development](#-installing-with-pip--editable-for-development) + - [📦 Generate wheel package to share with others](#-generate-wheel-package-to-share-with-others) + - [🐍Installing with Conda](#installing-with-conda) + - [🚀 Quick start](#-quick-start) + - [🗺️ Overview of Experiment Pipelines](#️-overview-of-experiment-pipelines) + - [⚒️ Utility Classes Used in Components](#️-utility-classes-used-in-components) + - [🪛 Configuring the Data Processing Component](#-configuring-the-data-processing-component) + - [🪛 Configuring the Prompt Processing Component](#-configuring-the-prompt-processing-component) + - [🪛 Configuring the Inference Component](#-configuring-the-inference-component) + - [🪛 Configuring the Evaluation Reporting Component](#-configuring-the-evaluation-reporting-component) +- [✋ How to contribute:](#-how-to-contribute) +- [✒️ Citation](#️-citation) +- [Responsible AI Considerations](#responsible-ai-considerations) + +## Benchmarks +The following table summarizes the benchmarks included in Eureka-Bench, their modalities, capabilities, and the corresponding experiment pipelines. The logs for each benchmark are available for download at the links provided in the table. + | Benchmark
#prompts | Modality | Capability |Logs| Pipeline Config | |-------------------------------|---------------|----------------------|------|-----| -| GeoMeter
1086 | Image -> Text | Geometric Reasoning | [GeoMeter.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/GeoMeter.zip) | [geometer.py](eureka_ml_insights/configs/geometer.py) | -| MMMU
900 | Image -> Text | Multimodal QA | [MMMU.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/MMMU.zip) |[mmmu.py](eureka_ml_insights/configs/mmmu.py)| -| Image Understanding
10249| Image -> Text | Object Recognition
Object Detection
Visual Prompting
Spatial Reasoning | [IMAGE_UNDERSTANDING.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/IMAGE_UNDERSTANDING.zip) | [object_recognition.py](eureka_ml_insights/configs/spatial_understanding/object_recognition.py)
[object_detection.py](eureka_ml_insights/configs/spatial_understanding/object_detection.py)
[visual_prompting.py](eureka_ml_insights/configs/spatial_understanding/visual_prompting.py)
[spatial_reasoning.py](eureka_ml_insights/configs/spatial_understanding/spatial_reasoning.py) | -| Vision Language
13500 | Image -> Text | Spatial Understanding
Navigation
Counting| [VISION_LANGUAGE.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/VISION_LANGUAGE.zip) |[spatial_map.py](eureka_ml_insights/configs/vision_language/spatial_map.py)
[maze.py](eureka_ml_insights/configs/vision_language/maze.py)
[spatial_grid.py](eureka_ml_insights/configs/vision_language/spatial_grid.py)| -| IFEval
541 | Text -> Text | Instruction Following | [IFEval.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/IFEval.zip) |[ifeval.py](eureka_ml_insights/configs/ifeval.py)| -| FlenQA
12000 | Text -> Text | Long Context Multi-hop QA | [FlenQA.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/FlenQA.zip) |[flenQA.py](eureka_ml_insights/configs/flenqa.py)| -| Kitab
34217 | Text -> Text | Information Retrieval | [Kitab.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/Kitab.zip) |[kitab.py](eureka_ml_insights/configs/kitab.py)| -| Toxigen
10500 | Text -> Text | Toxicity Detection
Safe Language Generation | [ToxiGen.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/ToxiGen.zip) |[toxigen.py](eureka_ml_insights/configs/toxigen.py)| +| GeoMeter
1086 | Image -> Text | Geometric Reasoning | [GeoMeter.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/GeoMeter.zip) | [geometer.py](eureka_ml_insights/user_configs/geometer.py) | +| MMMU
900 | Image -> Text | Multimodal QA | [MMMU.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/MMMU.zip) |[mmmu.py](eureka_ml_insights/user_configs/mmmu.py)| +| Image Understanding
10249| Image -> Text | Object Recognition
Object Detection
Visual Prompting
Spatial Reasoning | [IMAGE_UNDERSTANDING.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/IMAGE_UNDERSTANDING.zip) | [object_recognition.py](eureka_ml_insights/user_configs/spatial_understanding/object_recognition.py)
[object_detection.py](eureka_ml_insights/user_configs/spatial_understanding/object_detection.py)
[visual_prompting.py](eureka_ml_insights/user_configs/spatial_understanding/visual_prompting.py)
[spatial_reasoning.py](eureka_ml_insights/user_configs/spatial_understanding/spatial_reasoning.py) | +| Vision Language
13500 | Image -> Text | Spatial Understanding
Navigation
Counting| [VISION_LANGUAGE.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/VISION_LANGUAGE.zip) |[spatial_map.py](eureka_ml_insights/user_configs/vision_language/spatial_map.py)
[maze.py](eureka_ml_insights/user_configs/vision_language/maze.py)
[spatial_grid.py](eureka_ml_insights/user_configs/vision_language/spatial_grid.py)| +| IFEval
541 | Text -> Text | Instruction Following | [IFEval.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/IFEval.zip) |[ifeval.py](eureka_ml_insights/user_configs/ifeval.py)| +| FlenQA
12000 | Text -> Text | Long Context Multi-hop QA | [FlenQA.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/FlenQA.zip) |[flenQA.py](eureka_ml_insights/user_configs/flenqa.py)| +| Kitab
34217 | Text -> Text | Information Retrieval | [Kitab.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/Kitab.zip) |[kitab.py](eureka_ml_insights/user_configs/kitab.py)| +| Toxigen
10500 | Text -> Text | Toxicity Detection
Safe Language Generation | [ToxiGen.zip](https://aifeval.z5.web.core.windows.net/eureka-bench-logs/ToxiGen.zip) |[toxigen.py](eureka_ml_insights/user_configs/toxigen.py)| Note: The benchmarks on Image Understanding and Vision Language Understanding will be available soon on HuggingFace. Please stay tuned. -For non-determinism evaluations using the above benchmarks, we provide pipelines in [nondeterminism.py](eureka_ml_insights/configs/nondeterminism.py) +For non-determinism evaluations using the above benchmarks, we provide pipelines in [nondeterminism.py](eureka_ml_insights/user_configs/nondeterminism.py) ## Installation To get started, clone this repository to your local machine and navigate to the project directory. @@ -64,16 +86,16 @@ To reproduce the results of a pre-defined experiment pipeline, you can run the f ```python main.py --exp_config exp_config_name --model_config model_config_name --exp_logdir your_log_dir``` -For example, to run the `FlenQA_Experiment_Pipeline` experiment pipeline defined in `eureka_ml_insights/configs/flenqa.py` using the OpenAI GPT4 1106 Preview model, you can run the following command: +For example, to run the `FlenQA_Experiment_Pipeline` experiment pipeline defined in `eureka_ml_insights/user_configs/flenqa.py` using the OpenAI GPT4 1106 Preview model, you can run the following command: ```python main.py --exp_config FlenQA_Experiment_Pipeline --model_config OAI_GPT4_1106_PREVIEW_CONFIG --exp_logdir gpt4_1106_preveiw``` The results of the experiment will be saved in a directory under `logs/FlenQA_Experiment_Pipeline/gpt4_1106_preveiw`. For each experiment you run with these configurations, a new directory will be created using the date and time of the experiment run. -For other available experiment pipelines and model configurations, see the `eureka_ml_insights/configs` directory. In [model_configs.py](eureka_ml_insights/configs/model_configs.py) you can configure the model classes to use your API keys, Keu Vault urls, endpoints, and other model-specific configurations. +For other available experiment pipelines and model configurations, see the `eureka_ml_insights/user_configs` and `eureka_ml_insights/configs` directories, respectively. In [model_configs.py](eureka_ml_insights/configs/model_configs.py) you can configure the model classes to use your API keys, Key Vault urls, endpoints, and other model-specific configurations. ## 🗺️ Overview of Experiment Pipelines ![Components](./docs/figures/transparent_uml.png) -Experiment pipelines define the sequence of components that are run to process data, run inference, and evaluate the model outputs. You can find examples of experiment pipeline configurations in the `configs` directory. To create a new experiment configuration, you need to define a class that inherits from `ExperimentConfig` and implements the `configure_pipeline` method. In the `configure_pipeline` method you define the Pipeline config (arrangement of Components) for your Experiment. Once your class is ready, add it to `configs/__init__.py` import list. +Experiment pipelines define the sequence of components that are run to process data, run inference, and evaluate the model outputs. You can find examples of experiment pipeline configurations in the `user_configs` directory. To create a new experiment configuration, you need to define a class that inherits from `ExperimentConfig` and implements the `configure_pipeline` method. In the `configure_pipeline` method you define the Pipeline config (arrangement of Components) for your Experiment. Once your class is ready, add it to `user_configs/__init__.py` import list. Your Pipeline can use any of the available Components which can be found under the `core` directory: @@ -84,17 +106,17 @@ Your Pipeline can use any of the available Components which can be found under t - `DataJoin`: you can use this component to join two sources of data, for example to join the model outputs with the ground truth data for evaluation. Note that: -- You can inherit from one of the existing experiment config classes and override the necessary attributes to reduce the amount of code you need to write. You can find examples of this in [spatial_reasoning.py](eureka_ml_insights/configs/spatial_understanding/spatial_reasoning.py). +- You can inherit from one of the existing experiment config classes and override the necessary attributes to reduce the amount of code you need to write. You can find examples of this in [spatial_reasoning.py](eureka_ml_insights/user_configs/image_understanding/spatial_reasoning.py). - Your pipeline does not need to use all of the components. You can use only the components you need. And you can use the components multiple times in the pipeline. - Make sure the input of each component matches the output of the previous component in the pipeline. The components are run sequentially in the order they are defined in the pipeline configuration. - For standard scenarios you do not need to implement new components for your pipeline, but you do need to configure the existing components to use the correct utility classes (i.e. models, data readers, metrics, etc.) for your scenario. ### ⚒️ Utility Classes Used in Components -Utility classes include Models, Metrics, DataLoaders, DataReaders, etc. The components in your pipeline need to use the correct utility classes for your scenario. For example, to evaluate an OpenAI model on a dataset that is available on HuggingFace, you need to use the [`HFDataReader`](eureka_ml_insights/data_utils/data.py) data reader and the [`OpenAIModelsOAI`](eureka_ml_insights/models/models.py) model class. In standard scenarios do not need to implement new components for your pipeline, but you do need to configure the existing components to work with the correct utility classes. If you need a functionality that is not provided by the existing utility classes, you can implement a new utility class and use it in your pipeline. +Utility classes include Models, Metrics, DataLoaders, DataReaders, etc. The components in your pipeline need to use the correct utility classes for your scenario. For example, to evaluate an OpenAI model on a dataset that is available on HuggingFace, you need to use the [`HFDataReader`](eureka_ml_insights/data_utils/data.py) data reader and the [`AzureOpenAIModel`](eureka_ml_insights/models/models.py) (or alternatively, `DirectOpenAIModel`) model class. In standard scenarios do not need to implement new components for your pipeline, but you do need to configure the existing components to work with the correct utility classes. If you need a functionality that is not provided by the existing utility classes, you can implement a new utility class and use it in your pipeline. In general, to find out what utility classes and other attributes need to be configured for a component, you can look at the component's corresponding Config dataclass in `configs/config.py`. For example, if you are configuring the `DataProcessing` component, you can look at the `DataProcessingConfig` dataclass in `configs/config.py`. -Utility classes are also configurable by providing the name of the class and the initialization arguments. For example see ModelConfig in `configs/config.py` that can be initialized with the model class name and the model initialization arguments. +Utility classes are also configurable. You can do so by providing the name of the class and the initialization arguments. For example see ModelConfig in `configs/config.py` that can be initialized with the model class name and the model initialization arguments. For example, you can see examples of configuring Model classes in `configs/model_configs.py`. Our current components use the following utility classes: `DataReader`, `DataLoader`, `Model`, `Metric`, `Aggregator`. You can use the existing utility classes or implement new ones as needed to configure your components. @@ -118,7 +140,7 @@ In addition to the attributes of the DataProcessing component, the PromptProcess - `data_loader_config`: Configuration of the data_loader class to use for inference. You can find the available data classes in `data_utils/data.py`. - `output_dir`: This is the folder name where the model outputs will be saved. This folder will automatically be created under the experiment log directory and the model outputs will be saved in a file called `inference_result.jsonl`. -### 🪛 Configuring the Evaluation Reporting Component +### 🪛 Configuring the Evaluation Reporting Component - `data_reader_config`: Configuration object for the DataReader that is used to load the data into a pandas dataframe. This is the same type of utility class used in the DataProcessing component. - `metric_config`: a MetricConfig object to specify the metric class to use for evaluation. You can find the available metrics in `metrics/`. If you need to implement new metric classes, add them to this directory. - `aggregator_configs`/`visualizer_configs`: List of configs for aggregators/visualizers to apply to the metric results. These classes that take metric results and aggragate/analyze/vizualize them and save them. You can find the available aggregators and visualizers in `metrics/reports.py`. @@ -134,7 +156,7 @@ For more information see the [Code of Conduct FAQ](https://opensource.microsoft. To contribute to the framework: - please create a new branch. -- Implement your pipeline configuration class under `configs`, as well as any utility classes that your pipeline requires. +- Implement your pipeline configuration class under `user_configs`, as well as any utility classes that your pipeline requires. - Please add end-to-end tests for your contributions in the `tests` directory. - Please add unit tests for any new utility classes you implement in the `tests` directory. - Please add documentation to your classes and methods in form of docstrings. From 173538a817b54a88dd9cf5e4809bd6357a0afb1a Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Tue, 11 Mar 2025 23:52:22 +0000 Subject: [PATCH 2/9] resolve concurrency issues --- eureka_ml_insights/core/inference.py | 47 +++++++++++----------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index 133593f1..f31469b5 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -1,4 +1,3 @@ -import asyncio import logging import os import time @@ -40,6 +39,7 @@ def __init__( chat_mode (bool): optional. If True, the model will be used in chat mode, where a history of messages will be maintained in "previous_messages" column. """ super().__init__(output_dir) + self.model_config = model_config self.model = model_config.class_name(**model_config.init_args) self.data_loader = data_config.class_name(**data_config.init_args) self.writer = JsonLinesWriter(os.path.join(output_dir, "inference_result.jsonl")) @@ -170,7 +170,7 @@ def retrieve_exisiting_result(self, data, pre_inf_results_df): def run(self): if self.max_concurrent > 1: - asyncio.run(self._run_par()) + self._run_par() else: self._run() @@ -205,23 +205,7 @@ def _run(self): data.update(response_dict) writer.write(data) - from functools import partial - - async def run_in_excutor(self, model_inputs, executor): - """Run model.generate in a ThreadPoolExecutor. - args: - model_inputs (tuple): args and kwargs to be passed to the model.generate function. - executor (ThreadPoolExecutor): ThreadPoolExecutor instance. - """ - loop = asyncio.get_event_loop() - - # function to run in executor with args and kwargs - def sub_func(model_inputs): - return self.model.generate(*model_inputs[0], **model_inputs[1]) - - return await loop.run_in_executor(executor, sub_func, model_inputs) - - async def _run_par(self): + def _run_par(self): """parallel inference""" concurrent_inputs = [] concurrent_metadata = [] @@ -240,8 +224,8 @@ async def _run_par(self): # if batch is ready for concurrent inference elif len(concurrent_inputs) >= self.max_concurrent: - with ThreadPoolExecutor() as executor: - await self.run_batch(concurrent_inputs, concurrent_metadata, writer, executor) + with ThreadPoolExecutor(max_workers=self.max_concurrent) as executor: + self.run_batch(concurrent_inputs, concurrent_metadata, writer, executor) concurrent_inputs = [] concurrent_metadata = [] # add data to batch for concurrent inference @@ -249,10 +233,10 @@ async def _run_par(self): concurrent_metadata.append(data) # if data loader is exhausted but there are remaining data points that did not form a full batch if concurrent_inputs: - with ThreadPoolExecutor() as executor: - await self.run_batch(concurrent_inputs, concurrent_metadata, writer, executor) + with ThreadPoolExecutor(max_workers=self.max_concurrent) as executor: + self.run_batch(concurrent_inputs, concurrent_metadata, writer, executor) - async def run_batch(self, concurrent_inputs, concurrent_metadata, writer, executor): + def run_batch(self, concurrent_inputs, concurrent_metadata, writer, executor): """Run a batch of inferences concurrently using ThreadPoolExecutor. args: concurrent_inputs (list): list of inputs to the model.generate function. @@ -260,11 +244,16 @@ async def run_batch(self, concurrent_inputs, concurrent_metadata, writer, execut writer (JsonLinesWriter): JsonLinesWriter instance to write the results. executor (ThreadPoolExecutor): ThreadPoolExecutor instance. """ - tasks = [asyncio.create_task(self.run_in_excutor(input_data, executor)) for input_data in concurrent_inputs] - results = await asyncio.gather(*tasks) - for i in range(len(concurrent_inputs)): - data, response_dict = concurrent_metadata[i], results[i] + def sub_func(model_inputs): + # create a new instance of the model for thread-data-safety purposes + model = self.model_config.class_name(**self.model_config.init_args) + model.chat_mode = self.chat_mode + return model.generate(*model_inputs[0], **model_inputs[1]) + + results = executor.map(sub_func, concurrent_inputs) + for i, result in enumerate(results): + data, response_dict = concurrent_metadata[i], result self.validate_response_dict(response_dict) # prepare results for writing data.update(response_dict) - writer.write(data) + writer.write(data) \ No newline at end of file From d77dfad3df59be7c318ac646cff5d5bc24db5a62 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Wed, 12 Mar 2025 04:29:04 +0000 Subject: [PATCH 3/9] formatting --- eureka_ml_insights/core/inference.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index f31469b5..c766c0b5 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -244,6 +244,7 @@ def run_batch(self, concurrent_inputs, concurrent_metadata, writer, executor): writer (JsonLinesWriter): JsonLinesWriter instance to write the results. executor (ThreadPoolExecutor): ThreadPoolExecutor instance. """ + def sub_func(model_inputs): # create a new instance of the model for thread-data-safety purposes model = self.model_config.class_name(**self.model_config.init_args) @@ -256,4 +257,4 @@ def sub_func(model_inputs): self.validate_response_dict(response_dict) # prepare results for writing data.update(response_dict) - writer.write(data) \ No newline at end of file + writer.write(data) From 382f452f4e47605de33f0e253e12583170696ed9 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Thu, 13 Mar 2025 03:55:30 +0000 Subject: [PATCH 4/9] thread safety --- eureka_ml_insights/models/models.py | 328 ++++++++++++++++------------ 1 file changed, 191 insertions(+), 137 deletions(-) diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index 63351ce3..ae7ac687 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -13,6 +13,7 @@ from eureka_ml_insights.secret_management import get_secret + @dataclass class Model(ABC): """This class is used to define the structure of a model class. @@ -20,29 +21,27 @@ class Model(ABC): containing the model_output, is_valid, and other relevant information. """ - model_output: str = None - is_valid: bool = False - response_time: float = None - n_output_tokens: int = None chat_mode: bool = False - previous_messages: list = None @abstractmethod def generate(self, text_prompt, *args, **kwargs): raise NotImplementedError - def count_tokens(self): + def count_tokens(self, model_output: str = None, is_valid: bool = False): """ This method uses tiktoken tokenizer to count the number of tokens in the response. See: https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb + args: + model_output (str): the text response from the model. + is_valid (bool): whether the response is valid or not. returns: n_output_tokens (int): the number of tokens in the text response. """ encoding = tiktoken.get_encoding("cl100k_base") - if self.model_output is None or not self.is_valid: + if model_output is None or not is_valid: return None else: - n_output_tokens = len(encoding.encode(self.model_output)) + n_output_tokens = len(encoding.encode(model_output)) return n_output_tokens def base64encode(self, query_images): @@ -100,18 +99,19 @@ def get_response(self, request): # must return the model output and the response time raise NotImplementedError - def update_chat_history(self, query_text, *args, **kwargs): + def update_chat_history(self, query_text, model_output, *args, **kwargs): """ This method is used to update the chat history with the model response. args: query_text (str): the text prompt to generate the response. + model_output (str): the text response from the model. returns: previous_messages (list): a list of messages in the chat history. """ previous_messages = kwargs.get("previous_messages", []) previous_messages.append({"role": "user", "content": query_text}) - previous_messages.append({"role": "assistant", "content": self.model_output}) - self.previous_messages = previous_messages + previous_messages.append({"role": "assistant", "content": model_output}) + return previous_messages def generate(self, query_text, *args, **kwargs): """ @@ -127,38 +127,41 @@ def generate(self, query_text, *args, **kwargs): response_dict = {} request = self.create_request(query_text, *args, **kwargs) attempts = 0 + model_output = None + is_valid = False + response_time = None + while attempts < self.num_retries: try: - meta_response = self.get_response(request) + model_response = self.get_response(request) + if model_response: + response_dict.update(model_response) + model_output = model_response["model_output"] + response_time = model_response["response_time"] if self.chat_mode: - self.update_chat_history(query_text, *args, **kwargs) - if meta_response: - response_dict.update(meta_response) - self.is_valid = True + previous_messages = self.update_chat_history(query_text, model_output, *args, **kwargs) + + is_valid = True break except Exception as e: logging.warning(f"Attempt {attempts+1}/{self.num_retries} failed: {e}") do_return = self.handle_request_error(e) if do_return: - self.model_output = None - self.is_valid = False break attempts += 1 else: logging.warning("All attempts failed.") - self.is_valid = False - self.model_output = None response_dict.update( { - "is_valid": self.is_valid, - "model_output": self.model_output, - "response_time": self.response_time, - "n_output_tokens": self.count_tokens(), + "is_valid": is_valid, + "model_output": model_output, + "response_time": response_time, + "n_output_tokens": self.count_tokens(model_output, is_valid), } ) if self.chat_mode: - response_dict.update({"previous_messages": self.previous_messages}) + response_dict.update({"previous_messages": previous_messages}) return response_dict @abstractmethod @@ -217,8 +220,9 @@ def get_response(self, request): end_time = time.time() # Parse the response and return the model output. res = json.loads(response.read()) - self.model_output = res["output"] - self.response_time = end_time - start_time + model_output = res["output"] + response_time = end_time - start_time + return {"model_output": model_output, "response_time": response_time} def handle_request_error(self, e): if isinstance(e, urllib.error.HTTPError): @@ -227,7 +231,7 @@ def handle_request_error(self, e): logging.info(e.info()) logging.info(e.read().decode("utf8", "ignore")) else: - logging.info("The request failed with: "+ str(e)) + logging.info("The request failed with: " + str(e)) return False @@ -248,24 +252,22 @@ def __post_init__(self): self.headers = { "Content-Type": "application/json", "Authorization": ("Bearer " + self.api_key), - # The behavior of the API when extra parameters are indicated in the payload. - # Using pass-through makes the API to pass the parameter to the underlying model. - # Use this value when you want to pass parameters that you know the underlying model can support. + # The behavior of the API when extra parameters are indicated in the payload. + # Using pass-through makes the API to pass the parameter to the underlying model. + # Use this value when you want to pass parameters that you know the underlying model can support. # https://learn.microsoft.com/en-us/azure/machine-learning/reference-model-inference-chat-completions?view=azureml-api-2 - "extra-parameters": "pass-through" + "extra-parameters": "pass-through", } except ValueError: - self.bearer_token_provider = get_bearer_token_provider( - DefaultAzureCredential(), self.auth_scope - ) + self.bearer_token_provider = get_bearer_token_provider(DefaultAzureCredential(), self.auth_scope) self.headers = { "Content-Type": "application/json", "Authorization": ("Bearer " + self.bearer_token_provider()), - # The behavior of the API when extra parameters are indicated in the payload. - # Using pass-through makes the API to pass the parameter to the underlying model. + # The behavior of the API when extra parameters are indicated in the payload. + # Using pass-through makes the API to pass the parameter to the underlying model. # Use this value when you want to pass parameters that you know the underlying model can support. # https://learn.microsoft.com/en-us/azure/machine-learning/reference-model-inference-chat-completions?view=azureml-api-2 - "extra-parameters": "pass-through" + "extra-parameters": "pass-through", } @abstractmethod @@ -279,10 +281,15 @@ def get_response(self, request): response = urllib.request.urlopen(request, timeout=self.timeout) end_time = time.time() res = json.loads(response.read()) - self.model_output = res["choices"][0]["message"]["content"] - self.response_time = end_time - start_time + model_output = res["choices"][0]["message"]["content"] + response_time = end_time - start_time + response_dict = { + "model_output": model_output, + "response_time": response_time, + } if "usage" in res: - return {"usage": res["usage"]} + return response_dict.update({"usage": res["usage"]}) + return response_dict def handle_request_error(self, e): if isinstance(e, urllib.error.HTTPError): @@ -291,7 +298,7 @@ def handle_request_error(self, e): logging.info(e.info()) logging.info(e.read().decode("utf8", "ignore")) else: - logging.info("The request failed with: "+ str(e)) + logging.info("The request failed with: " + str(e)) return False @@ -333,7 +340,6 @@ def create_request(self, text_prompt, query_images=None, system_message=None, pr ] messages.append({"role": "user", "content": user_content}) - data = { "messages": messages, "max_tokens": self.max_tokens, @@ -431,10 +437,15 @@ def get_response(self, request): ) end_time = time.time() openai_response = completion.model_dump() - self.model_output = openai_response["choices"][0]["message"]["content"] - self.response_time = end_time - start_time + model_output = openai_response["choices"][0]["message"]["content"] + response_time = end_time - start_time + response_dict = { + "model_output": model_output, + "response_time": response_time, + } if "usage" in openai_response: - return {"usage": openai_response["usage"]} + return response_dict.update({"usage": openai_response["usage"]}) + return response_dict class AzureOpenAIClientMixIn: @@ -443,9 +454,7 @@ class AzureOpenAIClientMixIn: def get_client(self): from openai import AzureOpenAI - token_provider = get_bearer_token_provider( - DefaultAzureCredential(), self.auth_scope - ) + token_provider = get_bearer_token_provider(DefaultAzureCredential(), self.auth_scope) return AzureOpenAI( azure_endpoint=self.url, api_version=self.api_version, @@ -514,19 +523,19 @@ def __post_init__(self): class OpenAIO1RequestResponseMixIn: - + def create_request(self, text_prompt, query_images=None, system_message=None, previous_messages=None): messages = [] if system_message and "o1-preview" in self.model_name: logging.warning("System and developer messages are not supported by OpenAI O1 preview model.") elif system_message: - # Developer messages are the new system messages: - # Starting with o1-2024-12-17, o1 models support developer messages rather than system messages, + # Developer messages are the new system messages: + # Starting with o1-2024-12-17, o1 models support developer messages rather than system messages, # to align with the chain of command behavior described in the model spec. - messages.append({"role": "developer", "content": system_message}) + messages.append({"role": "developer", "content": system_message}) if previous_messages: messages.extend(previous_messages) - + user_content = text_prompt if query_images and "o1-preview" in self.model_name: logging.warning("Images are not supported by OpenAI O1 preview model.") @@ -558,10 +567,15 @@ def get_response(self, request): ) end_time = time.time() openai_response = completion.model_dump() - self.model_output = openai_response["choices"][0]["message"]["content"] - self.response_time = end_time - start_time + model_output = openai_response["choices"][0]["message"]["content"] + response_time = end_time - start_time + response_dict = { + "model_output": model_output, + "response_time": response_time, + } if "usage" in openai_response: - return {"usage": openai_response["usage"]} + return response_dict.update({"usage": openai_response["usage"]}) + return response_dict @dataclass @@ -598,7 +612,6 @@ class AzureOpenAIO1Model(OpenAIO1RequestResponseMixIn, AzureOpenAIClientMixIn, E api_version: str = "2023-06-01-preview" auth_scope: str = "https://cognitiveservices.azure.com/.default" - def __post_init__(self): self.client = self.get_client() @@ -639,7 +652,7 @@ def create_request(self, text_prompt, query_images=None, system_message=None, pr self.model = genai.GenerativeModel(self.model_name) else: self.model = genai.GenerativeModel(self.model_name, system_instruction=system_message) - + if query_images: return [text_prompt] + query_images else: @@ -647,58 +660,79 @@ def create_request(self, text_prompt, query_images=None, system_message=None, pr def get_response(self, request): start_time = time.time() - self.gemini_response = self.model.generate_content( - request, - generation_config=self.gen_config, - request_options={"timeout": self.timeout}, - safety_settings=self.safety_settings, - ) - end_time = time.time() - self.model_output = self.gemini_response.parts[0].text - self.response_time = end_time - start_time - if hasattr(self.gemini_response, "usage_metadata"): + try: + gemini_response = self.model.generate_content( + request, + generation_config=self.gen_config, + request_options={"timeout": self.timeout}, + safety_settings=self.safety_settings, + ) + end_time = time.time() + model_output = gemini_response.parts[0].text + response_time = end_time - start_time + except Exception as e: + is_non_transient_issue = self.handle_gemini_error(e, gemini_response) + if not is_non_transient_issue: + raise e + + response_dict = { + "model_output": model_output, + "response_time": response_time, + } + if hasattr(gemini_response, "usage_metadata"): try: - return { - "usage": { - "prompt_token_count": self.gemini_response.usage_metadata.prompt_token_count, - "candidates_token_count": self.gemini_response.usage_metadata.candidates_token_count, - "total_token_count": self.gemini_response.usage_metadata.total_token_count, + response_dict.update( + { + "usage": { + "prompt_token_count": gemini_response.usage_metadata.prompt_token_count, + "candidates_token_count": gemini_response.usage_metadata.candidates_token_count, + "total_token_count": gemini_response.usage_metadata.total_token_count, + } } - } + ) except AttributeError: logging.warning("Usage metadata not found in the response.") + return response_dict - def handle_request_error(self, e): + def handle_gemini_error(self, e, gemini_response): """Handles exceptions originating from making requests to Gemini through the python api. args: - e (_type_): Exception occurred during getting a response. - + e: Exception that occurred during getting a response. + gemini_response: The response object from the gemini model. returns: _type_: do_return (True if the call should not be attempted again). """ # Handling cases where the model explicitly blocks prompts and provides a reason for it. # In these cases, there is no need to make a new attempt as the model will continue to explicitly block the request, do_return = True. - if e.__class__.__name__ == "ValueError" and self.gemini_response.prompt_feedback.block_reason > 0: + if e.__class__.__name__ == "ValueError" and gemini_response.prompt_feedback.block_reason > 0: logging.warning( - f"Attempt failed due to explicitly blocked input prompt: {e} Block Reason {self.gemini_response.prompt_feedback.block_reason}" + f"Attempt failed due to explicitly blocked input prompt: {e} Block Reason {gemini_response.prompt_feedback.block_reason}" ) return True # Handling cases where the model implicitly blocks prompts and does not provide an explicit block reason for it but rather an empty content. # In these cases, there is no need to make a new attempt as the model will continue to implicitly block the request, do_return = True. # Note that, in some cases, the model may still provide a finish reason as shown here https://ai.google.dev/api/generate-content?authuser=2#FinishReason - elif e.__class__.__name__ == "IndexError" and len(self.gemini_response.parts) == 0: + elif e.__class__.__name__ == "IndexError" and len(gemini_response.parts) == 0: logging.warning(f"Attempt failed due to implicitly blocked input prompt and empty model output: {e}") # For cases where there are some response candidates do_return is still True because in most cases these candidates are incomplete. # Trying again may not necessarily help, unless in high temperature regimes. - if len(self.gemini_response.candidates) > 0: - logging.warning(f"The response is not empty and has : {len(self.gemini_response.candidates)} candidates") - logging.warning(f"Finish Reason for the first answer candidate is: {self.gemini_response.candidates[0].finish_reason}") - logging.warning(f"Safety Ratings for the first answer candidate are: {self.gemini_response.candidates[0].safety_ratings}") + if len(gemini_response.candidates) > 0: + logging.warning(f"The response is not empty and has : {len(gemini_response.candidates)} candidates") + logging.warning( + f"Finish Reason for the first answer candidate is: {gemini_response.candidates[0].finish_reason}" + ) + logging.warning( + f"Safety Ratings for the first answer candidate are: {gemini_response.candidates[0].safety_ratings}" + ) return True # Any other case will be re attempted again, do_return = False. return False + def handle_request_error(self, e): + return False + + @dataclass class TogetherModel(OpenAICommonRequestResponseMixIn, KeyBasedAuthMixIn, EndpointModel): """This class is used to interact with Together models through the together python api.""" @@ -709,13 +743,14 @@ class TogetherModel(OpenAICommonRequestResponseMixIn, KeyBasedAuthMixIn, Endpoin max_tokens: int = 65536 top_p: float = 0.95 presence_penalty: float = 0 - stop=["<|end▁of▁sentence|>"] + stop = ["<|end▁of▁sentence|>"] def __post_init__(self): from together import Together + self.api_key = self.get_api_key() self.client = Together(api_key=self.api_key) - + def get_response(self, request): start_time = time.time() completion = self.client.chat.completions.create( @@ -724,21 +759,26 @@ def get_response(self, request): presence_penalty=self.presence_penalty, temperature=self.temperature, max_tokens=self.max_tokens, - stop = self.stop, + stop=self.stop, **request, ) - + end_time = time.time() openai_response = completion.model_dump() - self.model_output = openai_response["choices"][0]["message"]["content"] - self.response_time = end_time - start_time + model_output = openai_response["choices"][0]["message"]["content"] + response_time = end_time - start_time + response_dict = { + "model_output": model_output, + "response_time": response_time, + } if "usage" in openai_response: - return {"usage": openai_response["usage"]} + response_dict.update({"usage": openai_response["usage"]}) + return response_dict def handle_request_error(self, e): - logging.warning(e) return False - + + @dataclass class HuggingFaceModel(Model): """This class is used to run a self-hosted language model via HuggingFace apis.""" @@ -760,8 +800,8 @@ def __post_init__(self): self.get_model() def get_model(self): - from transformers import AutoModelForCausalLM, AutoTokenizer import torch + from transformers import AutoModelForCausalLM, AutoTokenizer quantization_config = None if self.quantize: @@ -782,7 +822,6 @@ def get_model(self): use_flash_attention_2=self.use_flash_attn, ) - self.tokenizer = AutoTokenizer.from_pretrained(self.model_name, use_fast=False) def pick_available_device(self): @@ -823,11 +862,15 @@ def _generate(self, text_prompt, query_images=None): end_time = time.time() sequence_length = inputs["input_ids"].shape[1] new_output_ids = output_ids[:, sequence_length:] - self.model_output = self.tokenizer.batch_decode( + model_output = self.tokenizer.batch_decode( new_output_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0] - self.response_time = end_time - start_time + response_time = end_time - start_time + return { + "model_output": model_output, + "response_time": response_time, + } def generate(self, text_prompt, query_images=None, system_message=None): response_dict = {} @@ -837,21 +880,19 @@ def generate(self, text_prompt, query_images=None, system_message=None): text_prompt = self.model_template_fn(text_prompt, system_message) try: - meta_response = self._generate(text_prompt, query_images=query_images) - if meta_response: - response_dict.update(meta_response) - self.is_valid = True + model_response = self._generate(text_prompt, query_images=query_images) + if model_response: + response_dict.update(model_response) + is_valid = True except Exception as e: logging.warning(e) - self.is_valid = False + is_valid = False response_dict.update( { - "model_output": self.model_output, - "is_valid": self.is_valid, - "response_time": self.response_time, - "n_output_tokens": self.count_tokens(), + "is_valid": is_valid, + "n_output_tokens": self.count_tokens(response_dict["model_output"], response_dict["is_valid"]), } ) return response_dict @@ -895,6 +936,7 @@ def model_template_fn(self, text_prompt, system_message=None): else: return f"<|im_start|>user<|im_sep|>\n{text_prompt}<|im_end|>\n<|im_start|>assistant<|im_sep|>" + @dataclass class LLaVAHuggingFaceModel(HuggingFaceModel): """This class is used to run a self-hosted LLaVA model via HuggingFace apis.""" @@ -960,11 +1002,15 @@ def _generate(self, text_prompt, query_images=None): end_time = time.time() sequence_length = inputs["input_ids"].shape[1] new_output_ids = output_ids[:, sequence_length:] - self.model_output = self.processor.batch_decode( + model_output = self.processor.batch_decode( new_output_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0] - self.response_time = end_time - start_time + response_time = end_time - start_time + return { + "model_output": model_output, + "response_time": response_time, + } def generate(self, text_prompt, query_images=None, system_message=None): @@ -1105,7 +1151,7 @@ def get_model(self): gpu_memory_utilization=self.gpu_memory_utilization, cpu_offload_gb=self.cpu_offload_gb, ) - + def _generate(self, text_prompt, query_images=None): from vllm import SamplingParams @@ -1115,38 +1161,44 @@ def _generate(self, text_prompt, query_images=None): top_k=self.top_k, max_tokens=self.max_tokens, ) - + start_time = time.time() outputs = self.model.generate(text_prompt, sampling_params) end_time = time.time() - - self.model_output = outputs[0].outputs[0].text - self.response_time = end_time - start_time + model_output = outputs[0].outputs[0].text + response_time = end_time - start_time + return { + "model_output": model_output, + "response_time": response_time, + } def generate(self, text_prompt, query_images=None, system_message=None): response_dict = {} - + model_output = None + response_time = None + is_valid = False if text_prompt: if self.apply_model_template: text_prompt = self.model_template_fn(text_prompt, system_message) - try: - meta_response = self._generate(text_prompt, query_images=query_images) - if meta_response: - response_dict.update(meta_response) - self.is_valid = True + model_response = self._generate(text_prompt, query_images=query_images) + if model_response: + response_dict.update(model_response) + model_output = model_response["model_output"] + response_time = model_response["response_time"] + is_valid = True except Exception as e: logging.warning(e) - self.is_valid = False + is_valid = False response_dict.update( { - "model_output": self.model_output, - "is_valid": self.is_valid, - "response_time": self.response_time, - "n_output_tokens": self.count_tokens(), + "model_output": model_output, + "is_valid": is_valid, + "response_time": response_time, + "n_output_tokens": self.count_tokens(model_output, is_valid), } ) return response_dict @@ -1207,10 +1259,15 @@ def get_response(self, request): max_tokens=self.max_tokens, ) end_time = time.time() - self.model_output = completion.content[0].text - self.response_time = end_time - start_time + model_output = completion.content[0].text + response_time = end_time - start_time + response_dict = { + "model_output": model_output, + "response_time": response_time, + } if hasattr(completion, "usage"): - return {"usage": completion.usage.to_dict()} + response_dict.update({"usage": completion.usage.to_dict()}) + return response_dict def handle_request_error(self, e): return False @@ -1219,16 +1276,13 @@ def handle_request_error(self, e): @dataclass class TestModel(Model): # This class is used for testing purposes only. It only waits for a specified time and returns a response. - response_time: float = 0.1 - model_output: str = "This is a test response." - - def __post_init__(self): - self.n_output_tokens = self.count_tokens() def generate(self, text_prompt, **kwargs): + output = "This is a test response." + is_valid = True return { - "model_output": self.model_output, - "is_valid": True, - "response_time": self.response_time, - "n_output_tokens": self.n_output_tokens, + "model_output": output, + "is_valid": is_valid, + "response_time": 0.1, + "n_output_tokens": self.count_tokens(output, is_valid), } From 5c790b11c356e1e23ab9efa0ea68ffaa8bc19cb0 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Sat, 15 Mar 2025 02:13:53 +0000 Subject: [PATCH 5/9] bug fixes --- eureka_ml_insights/models/models.py | 63 ++++++++++++++++++----------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index a86694b8..09b46300 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -9,8 +9,8 @@ import anthropic import tiktoken -from azure.identity import get_bearer_token_provider -from azure.identity import DefaultAzureCredential +from azure.identity import DefaultAzureCredential, get_bearer_token_provider + from eureka_ml_insights.secret_management import get_secret @@ -288,7 +288,7 @@ def get_response(self, request): "response_time": response_time, } if "usage" in res: - return response_dict.update({"usage": res["usage"]}) + response_dict.update({"usage": res["usage"]}) return response_dict def handle_request_error(self, e): @@ -395,6 +395,7 @@ def create_request(self, text_prompt, query_images=None, system_message=None, pr body = str.encode(json.dumps(data)) return urllib.request.Request(self.url, body, self.headers) + @dataclass class DeepseekR1ServerlessAzureRestEndpointModel(ServerlessAzureRestEndpointModel): # setting temperature to 0.6 as suggested in https://huggingface.co/deepseek-ai/DeepSeek-R1 @@ -410,7 +411,9 @@ def create_request(self, text_prompt, query_images=None, system_message=None, pr if previous_messages: messages.extend(previous_messages) if query_images: - raise NotImplementedError("Images are not supported for DeepseekR1ServerlessAzureRestEndpointModel endpoints.") + raise NotImplementedError( + "Images are not supported for DeepseekR1ServerlessAzureRestEndpointModel endpoints." + ) messages.append({"role": "user", "content": text_prompt}) data = { "messages": messages, @@ -422,6 +425,7 @@ def create_request(self, text_prompt, query_images=None, system_message=None, pr body = str.encode(json.dumps(data)) return urllib.request.Request(self.url, body, self.headers) + @dataclass class OpenAICommonRequestResponseMixIn: """ @@ -470,7 +474,7 @@ def get_response(self, request): "response_time": response_time, } if "usage" in openai_response: - return response_dict.update({"usage": openai_response["usage"]}) + response_dict.update({"usage": openai_response["usage"]}) return response_dict @@ -489,7 +493,7 @@ def get_client(self): def handle_request_error(self, e): # if the error is due to a content filter, there is no need to retry - if hasattr(e, 'code') and e.code == "content_filter": + if hasattr(e, "code") and e.code == "content_filter": logging.warning("Content filtered.") response = None return response, False, True @@ -617,7 +621,7 @@ def get_response(self, request): "response_time": response_time, } if "usage" in openai_response: - return response_dict.update({"usage": openai_response["usage"]}) + response_dict.update({"usage": openai_response["usage"]}) return response_dict @@ -706,6 +710,7 @@ def create_request(self, text_prompt, query_images=None, system_message=None, pr def get_response(self, request): start_time = time.time() + gemini_response = None try: gemini_response = self.model.generate_content( request, @@ -717,9 +722,7 @@ def get_response(self, request): model_output = gemini_response.parts[0].text response_time = end_time - start_time except Exception as e: - is_non_transient_issue = self.handle_gemini_error(e, gemini_response) - if not is_non_transient_issue: - raise e + self.handle_gemini_error(e, gemini_response) response_dict = { "model_output": model_output, @@ -755,7 +758,7 @@ def handle_gemini_error(self, e, gemini_response): logging.warning( f"Attempt failed due to explicitly blocked input prompt: {e} Block Reason {gemini_response.prompt_feedback.block_reason}" ) - return True + # Handling cases where the model implicitly blocks prompts and does not provide an explicit block reason for it but rather an empty content. # In these cases, there is no need to make a new attempt as the model will continue to implicitly block the request, do_return = True. # Note that, in some cases, the model may still provide a finish reason as shown here https://ai.google.dev/api/generate-content?authuser=2#FinishReason @@ -771,11 +774,11 @@ def handle_gemini_error(self, e, gemini_response): logging.warning( f"Safety Ratings for the first answer candidate are: {gemini_response.candidates[0].safety_ratings}" ) - return True - # Any other case will be re attempted again, do_return = False. - return False + + raise e def handle_request_error(self, e): + # Any error case not handled in handle_gemini_error will be attempted again, do_return = False. return False @@ -1326,12 +1329,13 @@ def get_response(self, request): def handle_request_error(self, e): return False + @dataclass class ClaudeReasoningModel(ClaudeModel): """This class is used to interact with Claude reasoning models through the python api.""" model_name: str = None - temperature: float = 1. + temperature: float = 1.0 max_tokens: int = 20000 timeout: int = 600 thinking_enabled: bool = True @@ -1339,6 +1343,11 @@ class ClaudeReasoningModel(ClaudeModel): top_p: float = None def get_response(self, request): + model_output = None + response_time = None + thinking_output = None + redacted_thinking_output = None + response_dict = {} if self.top_p is not None: logging.warning("top_p is not supported for claude reasoning models as of 03/08/2025. It will be ignored.") @@ -1355,16 +1364,24 @@ def get_response(self, request): # Loop through completion.content to find the text output for content in completion.content: - if content.type == 'text': - self.model_output = content.text - elif content.type == 'thinking': - self.thinking_output = content.thinking - elif content.type == 'redacted_thinking': - self.redacted_thinking_output = content.data + if content.type == "text": + model_output = content.text + elif content.type == "thinking": + thinking_output = content.thinking + elif content.type == "redacted_thinking": + redacted_thinking_output = content.data - self.response_time = end_time - start_time + response_time = end_time - start_time + response_dict = { + "model_output": model_output, + "response_time": response_time, + "thinking_output": thinking_output, + "redacted_thinking_output": redacted_thinking_output, + } if hasattr(completion, "usage"): - return {"usage": completion.usage.to_dict()} + response_dict.update({"usage": completion.usage.to_dict()}) + return response_dict + @dataclass class TestModel(Model): From 2c45d8bb93a6584c11aadd31b775596676ee9f1a Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Sat, 15 Mar 2025 02:17:49 +0000 Subject: [PATCH 6/9] revert to single model uinstance --- eureka_ml_insights/core/inference.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index c766c0b5..1a3bf19d 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -39,7 +39,6 @@ def __init__( chat_mode (bool): optional. If True, the model will be used in chat mode, where a history of messages will be maintained in "previous_messages" column. """ super().__init__(output_dir) - self.model_config = model_config self.model = model_config.class_name(**model_config.init_args) self.data_loader = data_config.class_name(**data_config.init_args) self.writer = JsonLinesWriter(os.path.join(output_dir, "inference_result.jsonl")) @@ -246,10 +245,7 @@ def run_batch(self, concurrent_inputs, concurrent_metadata, writer, executor): """ def sub_func(model_inputs): - # create a new instance of the model for thread-data-safety purposes - model = self.model_config.class_name(**self.model_config.init_args) - model.chat_mode = self.chat_mode - return model.generate(*model_inputs[0], **model_inputs[1]) + return self.model.generate(*model_inputs[0], **model_inputs[1]) results = executor.map(sub_func, concurrent_inputs) for i, result in enumerate(results): From 1751905f7801ed76b8e8306c0a088c05651099bf Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Sat, 15 Mar 2025 17:30:00 +0000 Subject: [PATCH 7/9] llava model thread safety --- eureka_ml_insights/models/models.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index 09b46300..97d03214 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -1160,8 +1160,12 @@ def _generate(self, text_prompt, query_images=None, system_message=None): ) end_time = time.time() - self.model_output = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True)[0].strip() - self.response_time = end_time - start_time + model_output = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True)[0].strip() + response_time = end_time - start_time + return { + "model_output": model_output, + "response_time": response_time, + } @dataclass From 5d391b73db8076d61cbc392a45b3e182b02f192c Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Sat, 15 Mar 2025 18:11:10 +0000 Subject: [PATCH 8/9] refactored inference.py to not batch requests --- eureka_ml_insights/core/inference.py | 139 ++++++++++---------------- eureka_ml_insights/data_utils/data.py | 5 +- 2 files changed, 57 insertions(+), 87 deletions(-) diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index 1a3bf19d..f96827f6 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -1,12 +1,16 @@ import logging import os +import threading import time from collections import deque from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed from tqdm import tqdm +from eureka_ml_insights.configs.config import DataSetConfig, ModelConfig from eureka_ml_insights.data_utils.data import DataReader, JsonLinesWriter +from eureka_ml_insights.models.models import Model from .pipeline import Component from .reserved_names import INFERENCE_RESERVED_NAMES @@ -17,8 +21,8 @@ class Inference(Component): def __init__( self, - model_config, - data_config, + model_config: ModelConfig, + data_config: DataSetConfig, output_dir, resume_from=None, new_columns=None, @@ -39,14 +43,18 @@ def __init__( chat_mode (bool): optional. If True, the model will be used in chat mode, where a history of messages will be maintained in "previous_messages" column. """ super().__init__(output_dir) - self.model = model_config.class_name(**model_config.init_args) + self.model_config = model_config + self.model: Model = model_config.class_name(**model_config.init_args) self.data_loader = data_config.class_name(**data_config.init_args) self.writer = JsonLinesWriter(os.path.join(output_dir, "inference_result.jsonl")) + self.appender = JsonLinesWriter(os.path.join(output_dir, "inference_result.jsonl"), mode="a") self.resume_from = resume_from if resume_from and not os.path.exists(resume_from): raise FileNotFoundError(f"File {resume_from} not found.") self.new_columns = new_columns + self.pre_inf_results_df = None + self.last_uid = None # rate limiting parameters self.requests_per_minute = requests_per_minute @@ -57,6 +65,8 @@ def __init__( self.max_concurrent = max_concurrent self.chat_mode = chat_mode self.model.chat_mode = self.chat_mode + self.output_dir = output_dir + self.writer_lock = threading.Lock() @classmethod def from_config(cls, config): @@ -168,89 +178,48 @@ def retrieve_exisiting_result(self, data, pre_inf_results_df): return data def run(self): - if self.max_concurrent > 1: - self._run_par() - else: - self._run() - - def _run(self): - """sequential inference""" - if self.resume_from: - pre_inf_results_df, last_uid = self.fetch_previous_inference_results() - with self.data_loader as loader: - with self.writer as writer: - for data, model_args, model_kwargs in tqdm(loader, desc="Inference Progress:"): - if self.chat_mode and data.get("is_valid", True) is False: - continue - if self.resume_from and (data["uid"] <= last_uid): - prev_result = self.retrieve_exisiting_result(data, pre_inf_results_df) - if prev_result: - writer.write(prev_result) - continue - - # generate text from model (optionally at a limited rate) - if self.requests_per_minute: - while len(self.request_times) >= self.requests_per_minute: - # remove the oldest request time if it is older than the rate limit period - if time.time() - self.request_times[0] > self.period: - self.request_times.popleft() - else: - # rate limit is reached, wait for a second - time.sleep(1) - self.request_times.append(time.time()) - response_dict = self.model.generate(*model_args, **model_kwargs) - self.validate_response_dict(response_dict) - # write results - data.update(response_dict) - writer.write(data) - - def _run_par(self): - """parallel inference""" - concurrent_inputs = [] - concurrent_metadata = [] if self.resume_from: - pre_inf_results_df, last_uid = self.fetch_previous_inference_results() - with self.data_loader as loader: + self.pre_inf_results_df, self.last_uid = self.fetch_previous_inference_results() + with self.data_loader as loader, ThreadPoolExecutor(max_workers=self.max_concurrent) as executor: + futures = [executor.submit(self._run_single, record) for record in loader] + for future in tqdm(as_completed(futures), total=len(loader), mininterval=2.0, desc="Inference Progress: "): + result = future.result() + if result: + self._append_threadsafe(result) + + def _append_threadsafe(self, data): + with self.writer_lock: + with self.appender as appender: + appender.write(data) + + def _write_threadsafe(self, data): + with self.writer_lock: with self.writer as writer: - for data, model_args, model_kwargs in tqdm(loader, desc="Inference Progress:"): - if self.chat_mode and data.get("is_valid", True) is False: - continue - if self.resume_from and (data["uid"] <= last_uid): - prev_result = self.retrieve_exisiting_result(data, pre_inf_results_df) - if prev_result: - writer.write(prev_result) - continue - - # if batch is ready for concurrent inference - elif len(concurrent_inputs) >= self.max_concurrent: - with ThreadPoolExecutor(max_workers=self.max_concurrent) as executor: - self.run_batch(concurrent_inputs, concurrent_metadata, writer, executor) - concurrent_inputs = [] - concurrent_metadata = [] - # add data to batch for concurrent inference - concurrent_inputs.append((model_args, model_kwargs)) - concurrent_metadata.append(data) - # if data loader is exhausted but there are remaining data points that did not form a full batch - if concurrent_inputs: - with ThreadPoolExecutor(max_workers=self.max_concurrent) as executor: - self.run_batch(concurrent_inputs, concurrent_metadata, writer, executor) - - def run_batch(self, concurrent_inputs, concurrent_metadata, writer, executor): - """Run a batch of inferences concurrently using ThreadPoolExecutor. - args: - concurrent_inputs (list): list of inputs to the model.generate function. - concurrent_metadata (list): list of metadata corresponding to the inputs. - writer (JsonLinesWriter): JsonLinesWriter instance to write the results. - executor (ThreadPoolExecutor): ThreadPoolExecutor instance. - """ + writer.write(data) - def sub_func(model_inputs): - return self.model.generate(*model_inputs[0], **model_inputs[1]) + def _run_single(self, record: tuple[dict, tuple, dict]): + """Runs model.generate() with respect to a single element of the dataloader.""" - results = executor.map(sub_func, concurrent_inputs) - for i, result in enumerate(results): - data, response_dict = concurrent_metadata[i], result - self.validate_response_dict(response_dict) - # prepare results for writing - data.update(response_dict) - writer.write(data) + data, model_args, model_kwargs = record + if self.chat_mode and data.get("is_valid", True) is False: + return None + if self.resume_from and (data["uid"] <= self.last_uid): + prev_result = self.retrieve_exisiting_result(data, self.pre_inf_results_df) + if prev_result: + return prev_result + + # Rate limiter -- only for sequential inference + if self.requests_per_minute and self.max_concurrent == 1: + while len(self.request_times) >= self.requests_per_minute: + # remove the oldest request time if it is older than the rate limit period + if time.time() - self.request_times[0] > self.period: + self.request_times.popleft() + else: + # rate limit is reached, wait for a second + time.sleep(1) + self.request_times.append(time.time()) + + response_dict = self.model.generate(*model_args, **model_kwargs) + self.validate_response_dict(response_dict) + data.update(response_dict) + return data diff --git a/eureka_ml_insights/data_utils/data.py b/eureka_ml_insights/data_utils/data.py index 3a4bd1dc..b26a04d6 100644 --- a/eureka_ml_insights/data_utils/data.py +++ b/eureka_ml_insights/data_utils/data.py @@ -282,16 +282,17 @@ def load_image(self, image_file_name): class JsonLinesWriter: - def __init__(self, out_path): + def __init__(self, out_path, mode="w"): self.out_path = out_path # if the directory does not exist, create it directory = os.path.dirname(out_path) if not os.path.exists(directory): os.makedirs(directory) self.writer = None + self.mode = mode def __enter__(self): - self.writer = jsonlines.open(self.out_path, mode="w", dumps=NumpyEncoder().encode) + self.writer = jsonlines.open(self.out_path, mode=self.mode, dumps=NumpyEncoder().encode) return self.writer def __exit__(self, exc_type, exc_value, traceback): From 114e446f7d3294cf3f13e10fc9143c99f47e042d Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Mon, 17 Mar 2025 23:32:44 +0000 Subject: [PATCH 9/9] remove unused vars --- eureka_ml_insights/core/inference.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index c8f1f2e4..d99e92b9 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -43,10 +43,8 @@ def __init__( chat_mode (bool): optional. If True, the model will be used in chat mode, where a history of messages will be maintained in "previous_messages" column. """ super().__init__(output_dir) - self.model_config = model_config self.model: Model = model_config.class_name(**model_config.init_args) self.data_loader = data_config.class_name(**data_config.init_args) - self.writer = JsonLinesWriter(os.path.join(output_dir, "inference_result.jsonl")) self.appender = JsonLinesWriter(os.path.join(output_dir, "inference_result.jsonl"), mode="a") self.resume_from = resume_from @@ -192,11 +190,6 @@ def _append_threadsafe(self, data): with self.appender as appender: appender.write(data) - def _write_threadsafe(self, data): - with self.writer_lock: - with self.writer as writer: - writer.write(data) - def _run_single(self, record: tuple[dict, tuple, dict]): """Runs model.generate() with respect to a single element of the dataloader."""