Skip to content

Commit a3645ff

Browse files
committed
measure performance with/without loaded local file-system cache, and add documentation
Signed-off-by: dafnapension <[email protected]>
1 parent b677c30 commit a3645ff

File tree

6 files changed

+106
-32
lines changed

6 files changed

+106
-32
lines changed

.github/workflows/performance.yml

+15-2
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,17 @@ jobs:
4343
- name: Prepare the dirs for performance evaluation in main
4444
run: |
4545
mkdir -p performance_action
46+
mkdir -p performance_action/hf_fs_cache
4647
cp performance/bluebench_profiler.py performance_action/bluebench_profiler.py
4748
cp performance/compare_benchmark_performance_results.py performance_action/compare_benchmark_performance_results.py
4849
49-
- name: Run performance on PR just to warm the cache, output will be overwritten
50+
- name: Run performance on PR just to fill the file systems cache
51+
env:
52+
UNITXT_HF_LOAD_FROM_OFFLINE: "False"
53+
UNITXT_HF_SAVE_TO_OFFLINE: "True"
54+
UNITXT_HF_OFFLINE_DATASETS_PATH: performance_action/hf_fs_cache
5055
run : |
51-
python performance_action/bluebench_profiler.py --output_file performance_action/pr_results.json
56+
python performance_action/bluebench_profiler.py --output_file performance_action/pr_results.json --populate_fs_cache >> $GITHUB_STEP_SUMMARY
5257
5358
- name: Checkout main branch
5459
uses: actions/checkout@v4
@@ -57,6 +62,10 @@ jobs:
5762
clean: false
5863

5964
- name: Run performance on main branch
65+
env:
66+
UNITXT_HF_SAVE_TO_OFFLINE: "False"
67+
UNITXT_HF_LOAD_FROM_OFFLINE: "True"
68+
UNITXT_HF_OFFLINE_DATASETS_PATH: performance_action/hf_fs_cache
6069
run: |
6170
python performance_action/bluebench_profiler.py --output_file performance_action/main_results.json
6271
@@ -67,6 +76,10 @@ jobs:
6776
clean: false
6877

6978
- name: Run performance on PR branch
79+
env:
80+
UNITXT_HF_SAVE_TO_OFFLINE: "False"
81+
UNITXT_HF_LOAD_FROM_OFFLINE: "True"
82+
UNITXT_HF_OFFLINE_DATASETS_PATH: performance_action/hf_fs_cache
7083
run: |
7184
python performance_action/bluebench_profiler.py --output_file performance_action/pr_results.json
7285

docs/docs/adding_dataset.rst

+8-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ If a catalogued task fits your use case, you may reference it by name:
6464
task='tasks.translation.directed',
6565
6666
Loading the Dataset
67-
---------------------
67+
--------------------
6868

6969
To load data from an external source, we use a loader.
7070
For example, to load the `wmt16` translation dataset from the HuggingFace hub:
@@ -75,6 +75,13 @@ For example, to load the `wmt16` translation dataset from the HuggingFace hub:
7575
7676
More loaders for different sources are available in the :class:`loaders <unitxt.loaders>` section.
7777

78+
Loading from (and savig to) local file-system
79+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
80+
81+
Setting env variable ``UNITXT_HF_LOAD_FROM_OFFLINE=true``, will have loaders fetch the data from the local file-system directory
82+
specified by env variable ``UNITXT_HF_OFFLINE_DATASET_PATH``. To have loaders save the data they fetched from an
83+
outside hub, in that specified local file-system directory, set ``UNITXT_HF_SAVE_TO_OFFLINE=true``
84+
7885
The Preprocessing Pipeline
7986
---------------------------
8087

performance/bluebench_profiler.py

+46-12
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
CrossProviderInferenceEngine,
1818
InferenceEngine,
1919
)
20+
from unitxt.loaders import Loader
2021
from unitxt.logging_utils import get_logger
2122
from unitxt.operator import MultiStreamOperator
2223
from unitxt.settings_utils import get_settings
@@ -120,7 +121,11 @@ def collect_loaded_dataset_iterators(self, recipe: Union[DatasetRecipe, Benchmar
120121
if recipe.steps[1].generators:
121122
for stream_name in recipe.steps[1].generators:
122123
if recipe.steps[1].generators[stream_name].water_mark > -1:
123-
to_ret[stream_name] = (recipe.steps[1].generators[stream_name].measured_stream.gen_kwargs["stream"].gen_kwargs["stream"], recipe.steps[1].generators[stream_name].water_mark)
124+
stream = recipe.steps[1].generators[stream_name].measured_stream
125+
while not isinstance(stream.generator.__self__, Loader):
126+
assert "stream" in stream.gen_kwargs
127+
stream = stream.gen_kwargs["stream"]
128+
to_ret[stream_name] = (stream, recipe.steps[1].generators[stream_name].water_mark)
124129
else:
125130
# recipe is a benchmark
126131
for subset_name in recipe.subsets:
@@ -163,14 +168,23 @@ def profiler_do_the_profiling(self, dataset_query: str, **kwargs):
163168
t0 = time()
164169
recipe = load_recipe(dataset_query, **kwargs)
165170
t0_25 = time()
166-
recipe()
167-
t0_5 = time()
168171
self.equip_with_watermarker(recipe)
172+
t0_5 = time()
173+
ms = recipe()
169174
t1 = time()
175+
water_marks = self.collect_water_marks(recipe)
176+
logger.critical(f"water marks for query {dataset_query} following recipe(): {water_marks}")
177+
t1_5 = time()
170178
dataset = _source_to_dataset(source=recipe)
171179
t2 = time()
180+
water_marks = self.collect_water_marks(recipe)
181+
logger.critical(f"water marks for query {dataset_query} following _source_to_dataset(recipe): {water_marks}")
182+
t2_5 = time()
172183
dataset = self.list_from_dataset(dataset)
173184
t3 = time()
185+
water_marks = self.collect_water_marks(recipe)
186+
logger.critical(f"water marks for query {dataset_query} following list out all from dataset: {water_marks}")
187+
t3_5 = time()
174188
model = self.profiler_instantiate_model()
175189
t4 = time()
176190
if isinstance(dataset, dict):
@@ -181,31 +195,31 @@ def profiler_do_the_profiling(self, dataset_query: str, **kwargs):
181195
dataset = dataset[split_name]
182196
predictions = model.infer(dataset=dataset)
183197
t5 = time()
184-
evaluation_result = evaluate(predictions=predictions, data=dataset)
198+
evaluate(predictions=predictions, data=dataset)
185199
t6 = time()
186200
# now just streaming through recipe, without generating an HF dataset:
187201
ms = recipe()
188202
total_production_length_of_recipe = {k: len(list(ms[k])) for k in ms}
189203
t7 = time()
190204
# now just loading the specific instances actually loaded above, and listing right after recipe.loader(),
191205
# to report the loading time from the total processing time.
192-
water_marks = self.collect_water_marks(recipe)
206+
# water_marks = self.collect_water_marks(recipe)
193207
pulling_dict = self.collect_loaded_dataset_iterators(recipe)
194208
t8=time()
195209
self.enumerate_from_loaders(pulling_dict)
196210
t9 = time()
197-
logger.critical(f"water marks = {water_marks}")
198-
logger.critical(f"length of evaluation_result, over the returned dataset from Unitxt.load_dataset: {len(evaluation_result)}")
211+
# logger.critical(f"water marks = {water_marks}")
212+
# logger.critical(f"length of evaluation_result, over the returned dataset from Unitxt.load_dataset: {len(evaluation_result)}")
199213
logger.critical(f"lengths of total production of recipe: {total_production_length_of_recipe}")
200214

201215
return {
202216
"load_recipe" : t0_25 - t0,
203-
"recipe()": t0_5 - t0_25,
204-
"source_to_dataset": t2-t1,
205-
"list_out_dataset" : t3 - t2,
217+
"recipe()": t1 - t0_5,
218+
"source_to_dataset": t2-t1_5,
219+
"list_out_dataset" : t3 - t2_5,
206220
"just_load_and_list": t9-t8,
207221
"just_stream_through_recipe": t7-t6,
208-
"instantiate_model": t4 - t3,
222+
"instantiate_model": t4 - t3_5,
209223
"inference_time" : t5 - t4,
210224
"evaluation_time" : t6 - t5,
211225
}
@@ -239,7 +253,6 @@ def profile_no_cprofile():
239253
res[k] += dsq_time[k]
240254
return {k: round(res[k], 3) for k in res}
241255

242-
243256
def find_cummtime_of(func_name: str, file_name: str, pst_printout: str) -> float:
244257
relevant_lines = list(
245258
filter(
@@ -312,13 +325,34 @@ def main():
312325
action="store_true",
313326
help="whether to employ cProfile or just time diffs.",
314327
)
328+
parser.add_argument(
329+
"--populate_fs_cache",
330+
action="store_true",
331+
help="whether to save the downloaded datasets to a file-system cache.",
332+
)
315333
args = parser.parse_args()
316334

317335
# Ensure the directory for the output file exists
318336
output_dir = os.path.dirname(args.output_file)
319337
if output_dir:
320338
os.makedirs(output_dir, exist_ok=True)
321339

340+
if args.populate_fs_cache:
341+
assert os.path.exists(settings.hf_offline_datasets_path)
342+
assert settings.hf_save_to_offline
343+
t0 = time()
344+
queries = dataset_query if isinstance(dataset_query, list) else [dataset_query]
345+
for dsq in queries:
346+
recipe = load_recipe(dsq)
347+
ms = recipe()
348+
for split in ms:
349+
list(ms[split])
350+
t1 = time()
351+
print(f"Time to fetch the needed datasets from their hubs and save them in the local file-system: {round(t1-t0,3)} seconds")
352+
return
353+
354+
if settings.hf_load_from_offline:
355+
assert os.path.exists(settings.hf_offline_datasets_path)
322356

323357
dict_to_print = profile_no_cprofile()
324358

performance/compare_benchmark_performance_results.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
print(f"used_eager_mode in main = {main_perf['used_eager_mode']}")
2525
print(f"used_eager_mode in PR = {pr_perf['used_eager_mode']}")
2626
print(f"use Mocked inference = {os.environ['UNITXT_MOCK_INFERENCE_MODE']}")
27-
print("Raw datasets, that are loaded and processed here, are assumed to reside in local file ststem when the run starts.")
27+
print("Given the raw datasets stored in the local file system, their processing through the Unitxt pipeline lasts as detailed below.")
2828

2929
ratios = {}
3030
for k in pr_perf:
31-
if not isinstance(pr_perf, float):
31+
if not isinstance(pr_perf[k], float):
3232
continue
3333
ratios[k] = pr_perf[k] / main_perf[k] if main_perf[k] > 0 else 1
3434

src/unitxt/loaders.py

+33-15
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
from .logging_utils import get_logger
7373
from .operator import SourceOperator
7474
from .operators import Set
75+
from .random_utils import new_random_generator
7576
from .settings_utils import get_settings
7677
from .stream import DynamicStream, MultiStream
7778
from .type_utils import isoftype
@@ -85,20 +86,20 @@ def __init__(self, path):
8586
super().__init__(f"Loader cannot load and run remote code from {path} in huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment variable: UNITXT_ALLOW_UNVERIFIED_CODE.", Documentation.SETTINGS)
8687

8788
def hf_load_dataset(path: str, *args, **kwargs):
88-
if settings.hf_offline_datasets_path is not None:
89-
path = os.path.join(settings.hf_offline_datasets_path, path)
9089
try:
9190
return _hf_load_dataset(
9291
path,
9392
*args, **kwargs,
9493
download_config=DownloadConfig(
9594
max_retries=settings.loaders_max_retries,
95+
cache_dir=settings.hf_offline_datasets_path if settings.hf_save_to_offline else None,
9696
),
97+
cache_dir=settings.hf_offline_datasets_path if settings.hf_load_from_offline else None,
9798
verification_mode="no_checks",
9899
trust_remote_code=settings.allow_unverified_code,
99100
download_mode= "force_redownload" if settings.disable_hf_datasets_cache else "reuse_dataset_if_exists"
100101
)
101-
except ValueError as e:
102+
except Exception as e:
102103
if "trust_remote_code" in str(e):
103104
raise UnitxtUnverifiedCodeError(path) from e
104105

@@ -307,8 +308,8 @@ def load_dataset(
307308
split=split,
308309
num_proc=self.num_proc,
309310
)
310-
self.__class__._loader_cache.max_size = settings.loader_cache_size
311311
if not disable_memory_caching:
312+
self.__class__._loader_cache.max_size = settings.loader_cache_size
312313
self.__class__._loader_cache[dataset_id] = dataset
313314
return dataset
314315

@@ -334,6 +335,7 @@ def get_splits(self):
334335
download_config=DownloadConfig(
335336
max_retries=settings.loaders_max_retries,
336337
extract_on_the_fly=True,
338+
cache_dir = settings.hf_offline_datasets_path if settings.hf_load_from_offline else None
337339
),
338340
)
339341
except Exception as e:
@@ -409,13 +411,25 @@ def _maybe_set_classification_policy(self):
409411
["proprietary"], "when loading from local files"
410412
)
411413

412-
def get_reader(self):
414+
def get_reader(self)->callable:
413415
if self.file_type == "csv":
414416
return pd.read_csv
415417
if self.file_type == "json":
416418
return pd.read_json
417419
raise ValueError()
418420

421+
def get_writer(self, df:pd.DataFrame)->callable:
422+
if self.file_type == "csv":
423+
return df.to_csv
424+
if self.file_type == "json":
425+
return df.to_json
426+
raise ValueError()
427+
428+
def get_path_to_local(self, path_to_hub:str)->str:
429+
rand = new_random_generator(sub_seed=path_to_hub)
430+
file_path_to_simple_string = str(rand.randint(100000, 999999))
431+
return os.path.join(settings.hf_offline_datasets_path, file_path_to_simple_string+"."+self.file_type)
432+
419433
def get_args(self):
420434
args = {}
421435
if self.file_type == "csv":
@@ -438,29 +452,33 @@ def split_generator(self, split: str) -> Generator:
438452
if dataset is None:
439453
if self.get_limit() is not None:
440454
self.log_limited_loading()
455+
reader = self.get_reader()
456+
file_path = self.files[split]
457+
if settings.hf_load_from_offline:
458+
file_path = self.get_path_to_local(file_path)
441459
for attempt in range(settings.loaders_max_retries):
442460
try:
443-
reader = self.get_reader()
444-
if self.get_limit() is not None:
445-
self.log_limited_loading()
446-
447461
try:
448-
dataset = reader(self.files[split], **self.get_args()).to_dict(
449-
"records"
450-
)
462+
df = reader(file_path, **self.get_args())
451463
break
452464
except ValueError:
453465
import fsspec
454-
455-
with fsspec.open(self.files[split], mode="rt") as f:
456-
dataset = reader(f, **self.get_args()).to_dict("records")
466+
with fsspec.open(file_path, mode="rt") as f:
467+
df = reader(f, **self.get_args())
457468
break
458469
except Exception as e:
459470
logger.debug(f"Attempt csv load {attempt + 1} failed: {e}")
460471
if attempt < settings.loaders_max_retries - 1:
461472
time.sleep(2)
462473
else:
463474
raise e
475+
if settings.hf_save_to_offline:
476+
file_path = self.get_path_to_local(self.files[split])
477+
writer = self.get_writer(df)
478+
writer (file_path, index=False)
479+
480+
dataset = df.to_dict("records")
481+
464482
self.__class__._loader_cache.max_size = settings.loader_cache_size
465483
self.__class__._loader_cache[dataset_id] = dataset
466484

src/unitxt/settings_utils.py

+2
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ def __getattr__(self, key):
156156
settings.task_data_as_text = (bool, True)
157157
settings.default_provider = "watsonx"
158158
settings.default_format = None
159+
settings.hf_load_from_offline = (bool, False)
160+
settings.hf_save_to_offline = (bool, False)
159161
settings.hf_offline_datasets_path = None
160162
settings.hf_offline_metrics_path = None
161163
settings.hf_offline_models_path = None

0 commit comments

Comments
 (0)