-
Notifications
You must be signed in to change notification settings - Fork 4
Description
Some sort of easy caching utility would be great. Something like a decorator that can accept a filepattern and an overwrite argument on write.
Does something like this already exist? Also would be great to have this work with intake!
Proposed implementation
Lots to still work out here, but here's a stab:
import toolz
import functools
@toolz.curry
def cache_results(
func,
storage_dir=None,
storage_pattern=None,
create_directories=False,
reader=pd.read_csv,
writer=lambda x, fp, **kwargs: x.to_csv(fp, **kwargs),
ext='.csv',
read_kwargs=None,
write_kwargs=None):
read_kwargs = read_kwargs if read_kwargs is not None or {}
write_kwargs = write_kwargs if write_kwargs is not None or {}
@functools.wraps(func)
def inner(*args, **kwargs, cache_path=None, overwrite=False):
# TODO: covert args to kwargs using inspect to get arg_names
kwargs = dict(**dict(zip(arg_names, args)), **kwargs)
if cache_path is None:
if storage_pattern is not None:
cache_path = storage_pattern.format(**kwargs)
elif storage_dir is not None:
# TODO: create function some_hash_of_kwargs to handle different
# orderings, usage of defaults, etc. stably
cache_path = os.path.join(
storage_dir, some_hash_of_kwargs(kwargs) + ext)
else:
raise ArgumentError(
'must provide cache_path or define storage_dir '
'or storage_pattern at function decoration')
if not overwrite:
try:
if create_directories:
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
return reader(cache_path, **read_kwargs)
except (IOError, OSError, ValueError):
pass
res = func(*args, **kwargs)
writer(res, cache_path, **write_kwargs)
return res
return innerThis could be extended with a number of format-specific decorators quite easily
cache_in_netcdf = cache_results(
reader=xr.open_dataset,
writer=lambda ds, fp, **kwargs: ds.to_netcdf(fp, **kwargs),
ext='.nc')
cache_in_zarr = cache_results(
reader=xr.open_zarr,
writer=lambda ds, fp, **kwargs: ds.to_zarr(fp, **kwargs),
ext='.nc')
cache_in_parquet = cache_results(
reader=pd.read_parquet,
writer=lambda df, fp, **kwargs: df.to_parquet(fp, **kwargs),
ext='.nc')
cache_in_netcdf = cache_results(
reader=xr.open_dataset,
writer=lambda ds, fp, **kwargs: ds.to_netcdf(fp, **kwargs),
ext='.nc')
def cache_in_pickle(*args, **kwargs):
import pickle
def reader(fp, **kw):
with open(fp, 'rb') as f:
return pickle.load(f)
def writer(data, fp, **kw):
with open(fp, 'wb') as f:
pickle.dump(data, f)
return cache_results(*args, reader=reader, writer=writer, **kwargs)Proposed usage
These could then be used in a variety of ways.
No arguments on decoration requires that a path be provided when called:
@cache_in_csv
def generate_random_df(lenth):
return pd.DataFrame({'random': np.random.random(length)})
df = generate_random_df(4, cache_path='my_length_4_df.csv')Providing a storage pattern allows you to set up a complex directory structure
@cache_in_netcdf(
storage_pattern='/data/transformed/tasmax_squared/{rcp}/{model}/{year}.nc',
create_directories=True)
def square_tasmax(rcp, model, year):
tasmax_pattern = '/data/source/nasa-nex/tasmax/{rcp}/{model}/{year}.nc'
return xr.open_dataset(tasmax_pattern.format(rcp=rcp, model=model, year=year)) ** 2
results = []
for rcp in ['rcp45', 'rcp85']:
for model in ['ACCESS1-0', 'IPSL-ESM-CHEM']:
for year in [2020, 2050, 2090]:
# each of these results will be cached in a different file
results.append(((rcp, model, year), square_tasmax(rcp, model, year)))We can also pass reader/writer kwargs for more complex IO:
@cache_in_parquet(
read_kwargs=dict(storage_options={'token': 'cloud'}),
write_kwargs=dict(storage_options={'token': 'cloud'}))
def my_long_pandas_operation():
time.sleep(4)
return pd.DataFrame(np.random.random((6, 2)), columns=['a', 'b'])
df = my_long_pandas_operation(cache_path='gs://my_project/my_big_file.parquet')Once the argument hashing in the TODO referenced above is implemented, we could handle arbitrarily complex argument calls, which will be hashed to form a unique, stable file name, e.g.:
@cache_in_pickle(storage_dir='/data/cached_noaa_api_calls')
def call_noaa_api(*args, **kwargs):
return noaa_api.query(*args, **kwargs)TODO
- Look harder to see if something like this already exists!
- Implement the hashing and arg/kwarg inspection features
- Make sure the hashing implementation is stable and would not ever return wrong results (e.g. for changing defaults... ugh.
- Maybe implement staleness redo criteria?