Skip to content

Commit 1713e79

Browse files
safooraySafoora Yousefi
and
Safoora Yousefi
authored
Hotfix (#49)
- added an argument to inference component to accommodate adding columns to resume from component. - improved the check for column mismatch in resume_from - removed token counting from promp_processing component for speed up --------- Co-authored-by: Safoora Yousefi <[email protected]>
1 parent c56768f commit 1713e79

11 files changed

+48
-23
lines changed

eureka_ml_insights/configs/config.py

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class InferenceConfig(ComponentConfig):
111111
data_loader_config: UtilityClassConfigType = None
112112
model_config: UtilityClassConfigType = None
113113
resume_from: str = None
114+
new_columns: List[str] = None
114115
requests_per_minute: int = None
115116
max_concurrent: int = 1
116117

eureka_ml_insights/configs/mmmu.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def configure_pipeline(self, model_config: ModelConfig, resume_from: str = None,
8383
"format": ".jsonl",
8484
"transform": SequenceTransform(
8585
[
86-
CopyColumn(column_name_src="task", column_name_dst="category"),
86+
CopyColumn(column_name_src="__hf_task", column_name_dst="category"),
8787
MapStringsTransform(
8888
columns=["category"],
8989
mapping=MMMUTaskToCategories,

eureka_ml_insights/configs/nondeterminism.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,6 @@ def configure_pipeline(self, **kwargs):
5050
config = super().configure_pipeline(**kwargs)
5151
# Downsample the data and repeat each prompt 3 time
5252
self.data_processing_comp.data_reader_config.init_args["transform"].transforms.extend(
53-
[SamplerTransform(random_seed=42, sample_count=5, stratify_by="task"), MultiplyTransform(n_repeats=3)]
53+
[SamplerTransform(random_seed=42, sample_count=5, stratify_by="__hf_task"), MultiplyTransform(n_repeats=3)]
5454
)
5555
return config

eureka_ml_insights/core/inference.py

+21-7
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
from eureka_ml_insights.data_utils.data import DataReader, JsonLinesWriter
1111

1212
from .pipeline import Component
13-
13+
from .reserved_names import INFERENCE_RESERVED_NAMES
1414
MINUTE = 60
1515

1616

1717
class Inference(Component):
18-
def __init__(self, model_config, data_config, output_dir, resume_from=None, requests_per_minute=None, max_concurrent=1):
18+
def __init__(self, model_config, data_config, output_dir, resume_from=None, new_columns=None, requests_per_minute=None, max_concurrent=1):
1919

2020
"""
2121
Initialize the Inference component.
@@ -24,6 +24,7 @@ def __init__(self, model_config, data_config, output_dir, resume_from=None, requ
2424
data_config (dict): DataSetConfig object.
2525
output_dir (str): Directory to save the inference results.
2626
resume_from (str): optional. Path to the file where previous inference results are stored.
27+
new_columns (list): optional. List of new columns to be added to resume_from data to match the current inference response.
2728
requests_per_minute (int): optional. Number of inference requests to be made per minute, used for rate limiting. If not provided, rate limiting will not be applied.
2829
max_concurrent (int): optional. Maximum number of concurrent inferences to run. Default is 1.
2930
"""
@@ -35,6 +36,7 @@ def __init__(self, model_config, data_config, output_dir, resume_from=None, requ
3536
self.resume_from = resume_from
3637
if resume_from and not os.path.exists(resume_from):
3738
raise FileNotFoundError(f"File {resume_from} not found.")
39+
self.new_columns = new_columns
3840

3941
# rate limiting parameters
4042
self.requests_per_minute = requests_per_minute
@@ -51,6 +53,7 @@ def from_config(cls, config):
5153
config.data_loader_config,
5254
config.output_dir,
5355
resume_from=config.resume_from,
56+
new_columns=config.new_columns,
5457
requests_per_minute=config.requests_per_minute,
5558
max_concurrent=config.max_concurrent,
5659
)
@@ -59,7 +62,13 @@ def fetch_previous_inference_results(self):
5962
# fetch previous results from the provided resume_from file
6063
logging.info(f"Resuming inference from {self.resume_from}")
6164
pre_inf_results_df = DataReader(self.resume_from, format=".jsonl").load_dataset()
62-
65+
66+
# add new columns listed by the user to the previous inference results
67+
if self.new_columns:
68+
for col in self.new_columns:
69+
if col not in pre_inf_results_df.columns:
70+
pre_inf_results_df[col] = None
71+
6372
# validate the resume_from contents
6473
with self.data_loader as loader:
6574
_, sample_model_input = self.data_loader.get_sample_model_input()
@@ -73,11 +82,16 @@ def fetch_previous_inference_results(self):
7382
sample_response_dict = self.model.generate(*sample_model_input)
7483
# check if the inference response dictionary contains the same keys as the resume_from file
7584
eventual_keys = set(sample_response_dict.keys()) | set(sample_data_keys)
76-
if set(eventual_keys) != set(pre_inf_results_df.columns):
85+
86+
# in case of resuming from a file that was generated by an older version of the model,
87+
# we let the discrepancy in the reserved keys slide and later set the missing keys to None
88+
match_keys = set(pre_inf_results_df.columns) | set(INFERENCE_RESERVED_NAMES)
89+
90+
if set(eventual_keys) != match_keys:
91+
diff = set(eventual_keys) ^ set(match_keys)
7792
raise ValueError(
78-
f"Columns in resume_from file do not match the current inference response. "
79-
f"Current inference response keys: {sample_response_dict.keys()}. "
80-
f"Resume_from file columns: {pre_inf_results_df.columns}."
93+
f"Columns in resume_from file do not match the current input data and inference response. "
94+
f"Problemtaic columns: {diff}"
8195
)
8296

8397
# find the last uid that was inferenced

eureka_ml_insights/core/prompt_processing.py

-8
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
import logging
22
import os
3-
import statistics
43
from hashlib import md5
54
from typing import List, Optional
65

7-
from transformers import GPT2TokenizerFast
8-
96
from eureka_ml_insights.data_utils import JinjaPromptTemplate
107

118
from .data_processing import DataProcessing
@@ -70,16 +67,13 @@ def run(self) -> None:
7067
prompt_hashes = [compute_hash(prompt) for prompt in prompts]
7168
# otherwise, use the prompt data processor to generate prompts and save in the "prompt" column
7269
else:
73-
prompt_num_tokens = []
74-
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
7570
with open(prompt_output_file, "w", encoding="utf-8") as writer:
7671
for i, row in input_df.iterrows():
7772

7873
placeholders = row.to_dict()
7974
try:
8075
prompt = self.prompt_data_processor.create(placeholders)
8176
success_indexes.append(i)
82-
prompt_num_tokens.append(len(tokenizer.tokenize(prompt)))
8377
prompt_hashes.append(compute_hash(prompt))
8478
prompts.append(prompt)
8579
writer.write(prompt + "\n")
@@ -91,8 +85,6 @@ def run(self) -> None:
9185
else:
9286
raise e
9387

94-
logging.info(f"Average prompt num tokens: {statistics.fmean(prompt_num_tokens)}.")
95-
9688
input_df = self.get_desired_columns(input_df)
9789
# Remove `model_output`, `is_valid`, `response_time`, `n_output_tokens` columns if they exists
9890
# in the data because these names are reserved for the inference component's use.
+2-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
# if your data has any of these columns, they may be removed or overwritten by Eureka
12
INFERENCE_RESERVED_NAMES = ["model_output", "is_valid", "response_time", "n_output_tokens"]
2-
PROMPT_PROC_RESERVED_NAMES = ["prompt_hash", "prompt", "uid", "data_point_id", "data_repeat_id"]
3+
PROMPT_PROC_RESERVED_NAMES = ["prompt_hash", "prompt", "uid", "data_point_id", "data_repeat_id", "__hf_task", "__hf_split"]

eureka_ml_insights/data_utils/data.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -613,14 +613,14 @@ def _load_dataset(self) -> pd.DataFrame:
613613
hf_dataset = load_dataset(self.path, cache_dir=self.cache_dir, split=self.split)
614614
for i, data_split in enumerate(hf_dataset):
615615
task_df = self._hf_to_dataframe(data_split)
616-
task_df["split"] = self.split[i]
616+
task_df["__hf_split"] = self.split[i]
617617
df_frames.append(task_df)
618618
else:
619619
for task in self.tasks:
620620
hf_dataset = load_dataset(self.path, task, cache_dir=self.cache_dir, split=self.split)
621621
for i, data_split in enumerate(hf_dataset):
622622
task_df = self._hf_to_dataframe(data_split)
623-
task_df["task"] = task
624-
task_df["split"] = self.split[i]
623+
task_df["__hf_task"] = task
624+
task_df["__hf_split"] = self.split[i]
625625
df_frames.append(task_df)
626626
return pd.concat(df_frames)

eureka_ml_insights/data_utils/toxigen_utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def transform(self, df: pd.DataFrame) -> pd.DataFrame:
6868
df[[self.model_output_column, "is_valid"]] = df[self.model_output_column].apply(
6969
lambda x: pd.Series([parse_output(x, delimiters, True)[0], parse_output(x, delimiters, True)[1]])
7070
)
71-
df[[self.gt_column, self.category]] = df["split"].apply(
71+
df[[self.gt_column, self.category]] = df["__hf_split"].apply(
7272
lambda x: pd.Series([label_category_map(x)[0], label_category_map(x)[1]])
7373
)
7474
df[self.merged_group] = df[self.category] + "_" + df[self.gt_column]

eureka_ml_insights/models/models.py

+2
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,8 @@ def get_response(self, request):
480480
openai_response = completion.model_dump()
481481
self.model_output = openai_response["choices"][0]["message"]["content"]
482482
self.response_time = end_time - start_time
483+
if "usage" in openai_response:
484+
return {"usage": openai_response["usage"]}
483485

484486

485487
@dataclass

tests/pipeline_tests.py

+15
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
MetricConfig,
3333
ModelConfig,
3434
ToxiGen_Discriminative_PIPELINE,
35+
ToxiGen_Generative_PIPELINE,
3536
)
3637
from eureka_ml_insights.core import Pipeline
3738
from eureka_ml_insights.data_utils.transform import (
@@ -259,6 +260,17 @@ def configure_pipeline(self):
259260
}
260261
return config
261262

263+
class TEST_TOXIGEN_GEN_PIPELINE(ToxiGen_Generative_PIPELINE):
264+
def configure_pipeline(self):
265+
config = super().configure_pipeline(model_config=ModelConfig(GenericTestModel, {}))
266+
self.inference_comp.data_loader_config.class_name = TestDataLoader
267+
self.inference_comp.data_loader_config.init_args = {
268+
"path": os.path.join(self.data_pre_processing.output_dir, "transformed_data.jsonl"),
269+
"n_iter": N_ITER,
270+
}
271+
self.eval_inference_comp.model_config = ModelConfig(ToxiGenTestModel, {})
272+
return config
273+
262274

263275
class TEST_MMMU_PIPELINE(MMMU_BASELINE_PIPELINE):
264276
# Test config the MMMU benchmark with MultipleChoiceTestModel and TestMMDataLoader
@@ -425,6 +437,9 @@ class TOXIGEN_PipelineTest(PipelineTest, unittest.TestCase):
425437
def get_config(self):
426438
return TEST_TOXIGEN_PIPELINE().pipeline_config
427439

440+
class TOXIGEN_GEN_PipelineTest(PipelineTest, unittest.TestCase):
441+
def get_config(self):
442+
return TEST_TOXIGEN_GEN_PIPELINE().pipeline_config
428443

429444
class KITAB_ONE_BOOK_CONSTRAINT_PIPELINE_PipelineTest(PipelineTest, unittest.TestCase):
430445
def get_config(self):

tests/test_utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def __init__(self, model_name="generic_test_model"):
1616

1717
def generate(self, text_prompt, query_images=None):
1818
time.sleep(0.1)
19-
return {"model_output": "model output", "is_valid": True}
19+
return {"model_output": "model output", "is_valid": True, "response_time": 0, "n_output_tokens": 0}
2020

2121

2222
class TestHFDataReader(HFDataReader):

0 commit comments

Comments
 (0)