Skip to content

Commit 6133acb

Browse files
committed
Add optional file-based listings caching
1 parent da77548 commit 6133acb

14 files changed

+446
-93
lines changed

docs/source/api.rst

+10-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ Base Classes
4747
fsspec.core.OpenFiles
4848
fsspec.core.get_fs_token_paths
4949
fsspec.core.url_to_fs
50-
fsspec.dircache.DirCache
50+
fsspec.dircache.DisabledListingsCache
51+
fsspec.dircache.MemoryListingsCache
52+
fsspec.dircache.FileListingsCache
5153
fsspec.FSMap
5254
fsspec.generic.GenericFileSystem
5355
fsspec.registry.register_implementation
@@ -82,7 +84,13 @@ Base Classes
8284

8385
.. autofunction:: fsspec.core.url_to_fs
8486

85-
.. autoclass:: fsspec.dircache.DirCache
87+
.. autoclass:: fsspec.dircache.DisabledListingsCache
88+
:members: __init__
89+
90+
.. autoclass:: fsspec.dircache.MemoryListingsCache
91+
:members: __init__
92+
93+
.. autoclass:: fsspec.dircache.FileListingsCache
8694
:members: __init__
8795

8896
.. autoclass:: fsspec.FSMap

docs/source/changelog.rst

+8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
Changelog
22
=========
33

4+
Dev
5+
--------
6+
7+
Enhancements
8+
9+
- add file-based listing cache using diskcache (#895)
10+
warning: use new ``listings_cache_options`` instead of ``use_listings_cache`` etc.
11+
412
2024.3.1
513
--------
614

docs/source/features.rst

+20-9
Original file line numberDiff line numberDiff line change
@@ -181,15 +181,26 @@ Listings Caching
181181
----------------
182182

183183
For some implementations, getting file listings (i.e., ``ls`` and anything that
184-
depends on it) is expensive. These implementations use dict-like instances of
185-
:class:`fsspec.dircache.DirCache` to manage the listings.
186-
187-
The cache allows for time-based expiry of entries with the ``listings_expiry_time``
188-
parameter, or LRU expiry with the ``max_paths`` parameter. These can be
189-
set on any implementation instance that uses listings caching; or to skip the
190-
caching altogether, use ``use_listings_cache=False``. That would be appropriate
191-
when the target location is known to be volatile because it is being written
192-
to from other sources.
184+
depends on it) is expensive. These implementations maye use either dict-like instances of
185+
:class:`fsspec.dircache.MemoryListingsCache` or file-based caching with instances of
186+
:class:`fsspec.dircache.FileListingsCache` to manage the listings.
187+
188+
The listings cache can be controlled via the keyword ``listings_cache_options`` which is a dictionary.
189+
The type of cache that is used, can be controlled via the keyword ``cache_type`` (`disabled`, `memory` or `file`).
190+
The cache allows for time-based expiry of entries with the keyword ``expiry_time``. If the target location is known to
191+
be volatile because e.g. it is being written to from other sources we recommend to disable the listings cache.
192+
If you want to use the file-based caching, you can also provide the argument
193+
``directory`` to determine where the cache file is stored.
194+
195+
Example for ``listings_cache_options``:
196+
197+
.. code-block:: json
198+
199+
{
200+
"cache_type": "file",
201+
"expiry_time": 3600,
202+
"directory": "/tmp/cache"
203+
}
193204
194205
When the ``fsspec`` instance writes to the backend, the method ``invalidate_cache``
195206
is called, so that subsequent listing of the given paths will force a refresh. In

fsspec/asyn.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -312,15 +312,23 @@ class AsyncFileSystem(AbstractFileSystem):
312312
mirror_sync_methods = True
313313
disable_throttling = False
314314

315-
def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
315+
def __init__(
316+
self,
317+
*args,
318+
asynchronous=False,
319+
loop=None,
320+
batch_size=None,
321+
listings_cache_options=None,
322+
**kwargs,
323+
):
316324
self.asynchronous = asynchronous
317325
self._pid = os.getpid()
318326
if not asynchronous:
319327
self._loop = loop or get_loop()
320328
else:
321329
self._loop = None
322330
self.batch_size = batch_size
323-
super().__init__(*args, **kwargs)
331+
super().__init__(listings_cache_options, *args, **kwargs)
324332

325333
@property
326334
def loop(self):

fsspec/dircache.py

+140-20
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,44 @@
1+
import logging
12
import time
23
from collections.abc import MutableMapping
4+
from enum import Enum
35
from functools import lru_cache
6+
from pathlib import Path
7+
from typing import Optional, Union
48

9+
logger = logging.getLogger(__name__)
510

6-
class DirCache(MutableMapping):
11+
12+
class DisabledListingsCache(MutableMapping):
13+
def __init__(self, *args, **kwargs):
14+
pass
15+
16+
def __getitem__(self, item):
17+
raise KeyError
18+
19+
def __setitem__(self, key, value):
20+
pass
21+
22+
def __delitem__(self, key):
23+
pass
24+
25+
def __iter__(self):
26+
return iter(())
27+
28+
def __len__(self):
29+
return 0
30+
31+
def clear(self):
32+
pass
33+
34+
def __contains__(self, item):
35+
return False
36+
37+
def __reduce__(self):
38+
return (DisabledListingsCache, ())
39+
40+
41+
class MemoryListingsCache(MutableMapping):
742
"""
843
Caching of directory listings, in a structure like::
944
@@ -26,19 +61,14 @@ class DirCache(MutableMapping):
2661

2762
def __init__(
2863
self,
29-
use_listings_cache=True,
30-
listings_expiry_time=None,
64+
expiry_time=None,
3165
max_paths=None,
32-
**kwargs,
3366
):
3467
"""
3568
3669
Parameters
3770
----------
38-
use_listings_cache: bool
39-
If False, this cache never returns items, but always reports KeyError,
40-
and setting items has no effect
41-
listings_expiry_time: int or float (optional)
71+
expiry_time: int or float (optional)
4272
Time in seconds that a listing is considered valid. If None,
4373
listings do not expire.
4474
max_paths: int (optional)
@@ -49,15 +79,14 @@ def __init__(
4979
self._times = {}
5080
if max_paths:
5181
self._q = lru_cache(max_paths + 1)(lambda key: self._cache.pop(key, None))
52-
self.use_listings_cache = use_listings_cache
53-
self.listings_expiry_time = listings_expiry_time
54-
self.max_paths = max_paths
82+
self._expiry_time = expiry_time
83+
self._max_paths = max_paths
5584

5685
def __getitem__(self, item):
57-
if self.listings_expiry_time is not None:
58-
if self._times.get(item, 0) - time.time() < -self.listings_expiry_time:
86+
if self._expiry_time is not None:
87+
if self._times.get(item, 0) - time.time() < -self._expiry_time:
5988
del self._cache[item]
60-
if self.max_paths:
89+
if self._max_paths:
6190
self._q(item)
6291
return self._cache[item] # maybe raises KeyError
6392

@@ -75,12 +104,10 @@ def __contains__(self, item):
75104
return False
76105

77106
def __setitem__(self, key, value):
78-
if not self.use_listings_cache:
79-
return
80-
if self.max_paths:
107+
if self._max_paths:
81108
self._q(key)
82109
self._cache[key] = value
83-
if self.listings_expiry_time is not None:
110+
if self._expiry_time is not None:
84111
self._times[key] = time.time()
85112

86113
def __delitem__(self, key):
@@ -93,6 +120,99 @@ def __iter__(self):
93120

94121
def __reduce__(self):
95122
return (
96-
DirCache,
97-
(self.use_listings_cache, self.listings_expiry_time, self.max_paths),
123+
MemoryListingsCache,
124+
(self._expiry_time, self._max_paths),
125+
)
126+
127+
128+
class FileListingsCache(MutableMapping):
129+
def __init__(
130+
self,
131+
expiry_time: Optional[int],
132+
directory: Optional[Path],
133+
):
134+
"""
135+
136+
Parameters
137+
----------
138+
expiry_time: int or float (optional)
139+
Time in seconds that a listing is considered valid. If None,
140+
listings do not expire.
141+
directory: str (optional)
142+
Directory path at which the listings cache file is stored. If None,
143+
an autogenerated path at the user folder is created.
144+
145+
"""
146+
try:
147+
import platformdirs
148+
from diskcache import Cache
149+
except ImportError as e:
150+
raise ImportError(
151+
"The optional dependencies ``platformdirs`` and ``diskcache`` are required for file-based dircache."
152+
) from e
153+
154+
if not directory:
155+
directory = platformdirs.user_cache_dir(appname="fsspec")
156+
directory = Path(directory) / "dircache" / str(expiry_time)
157+
158+
try:
159+
directory.mkdir(exist_ok=True, parents=True)
160+
except OSError as e:
161+
logger.error(f"Directory for dircache could not be created at {directory}.")
162+
raise e
163+
else:
164+
logger.info(f"Dircache located at {directory}.")
165+
166+
self._expiry_time = expiry_time
167+
self._directory = directory
168+
self._cache = Cache(directory=str(directory))
169+
170+
def __getitem__(self, item):
171+
"""Draw item as fileobject from cache, retry if timeout occurs"""
172+
return self._cache.get(key=item, read=True, retry=True)
173+
174+
def clear(self):
175+
self._cache.clear()
176+
177+
def __len__(self):
178+
return len(list(self._cache.iterkeys()))
179+
180+
def __contains__(self, item):
181+
value = self._cache.get(item, retry=True) # None, if expired
182+
if value:
183+
return True
184+
return False
185+
186+
def __setitem__(self, key, value):
187+
self._cache.set(key=key, value=value, expire=self._expiry_time, retry=True)
188+
189+
def __delitem__(self, key):
190+
del self._cache[key]
191+
192+
def __iter__(self):
193+
return (k for k in self._cache.iterkeys() if k in self)
194+
195+
def __reduce__(self):
196+
return (
197+
FileListingsCache,
198+
(self._expiry_time, self._directory),
98199
)
200+
201+
202+
class CacheType(Enum):
203+
DISABLED = DisabledListingsCache
204+
MEMORY = MemoryListingsCache
205+
FILE = FileListingsCache
206+
207+
208+
def create_listings_cache(
209+
cache_type: CacheType,
210+
expiry_time: Optional[int],
211+
**kwargs,
212+
) -> Optional[Union[MemoryListingsCache, FileListingsCache]]:
213+
cache_map = {
214+
CacheType.DISABLED: DisabledListingsCache,
215+
CacheType.MEMORY: MemoryListingsCache,
216+
CacheType.FILE: FileListingsCache,
217+
}
218+
return cache_map[cache_type](expiry_time, **kwargs)

fsspec/implementations/http.py

+33-14
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import logging
44
import re
55
import weakref
6-
from copy import copy
76
from urllib.parse import urlparse
87

98
import aiohttp
@@ -58,6 +57,7 @@ def __init__(
5857
client_kwargs=None,
5958
get_client=get_client,
6059
encoded=False,
60+
listings_cache_options=None,
6161
**storage_options,
6262
):
6363
"""
@@ -83,11 +83,39 @@ def __init__(
8383
A callable which takes keyword arguments and constructs
8484
an aiohttp.ClientSession. It's state will be managed by
8585
the HTTPFileSystem class.
86+
listings_cache_options: dict
87+
Options for the listings cache.
8688
storage_options: key-value
8789
Any other parameters passed on to requests
8890
cache_type, cache_options: defaults used in open
8991
"""
90-
super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
92+
# TODO: remove in future release
93+
# Clean caching-related parameters from `storage_options`
94+
# before propagating them as `request_options` through `self.kwargs`.
95+
old_listings_cache_kwargs = {
96+
"use_listings_cache",
97+
"listings_expiry_time",
98+
"max_paths",
99+
"skip_instance_cache",
100+
}
101+
# intersection of old_listings_cache_kwargs and storage_options
102+
old_listings_cache_kwargs = old_listings_cache_kwargs.intersection(
103+
storage_options
104+
)
105+
if old_listings_cache_kwargs:
106+
logger.warning(
107+
f"The following parameters are not used anymore and will be ignored: {old_listings_cache_kwargs}. "
108+
f"Use new `listings_cache_options` instead."
109+
)
110+
for key in old_listings_cache_kwargs:
111+
del storage_options[key]
112+
super().__init__(
113+
self,
114+
asynchronous=asynchronous,
115+
loop=loop,
116+
listings_cache_options=listings_cache_options,
117+
**storage_options,
118+
)
91119
self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE
92120
self.simple_links = simple_links
93121
self.same_schema = same_scheme
@@ -96,19 +124,10 @@ def __init__(
96124
self.client_kwargs = client_kwargs or {}
97125
self.get_client = get_client
98126
self.encoded = encoded
99-
self.kwargs = storage_options
100-
self._session = None
101-
102-
# Clean caching-related parameters from `storage_options`
103-
# before propagating them as `request_options` through `self.kwargs`.
104127
# TODO: Maybe rename `self.kwargs` to `self.request_options` to make
105128
# it clearer.
106-
request_options = copy(storage_options)
107-
self.use_listings_cache = request_options.pop("use_listings_cache", False)
108-
request_options.pop("listings_expiry_time", None)
109-
request_options.pop("max_paths", None)
110-
request_options.pop("skip_instance_cache", None)
111-
self.kwargs = request_options
129+
self.kwargs = storage_options
130+
self._session = None
112131

113132
@property
114133
def fsid(self):
@@ -201,7 +220,7 @@ async def _ls_real(self, url, detail=True, **kwargs):
201220
return sorted(out)
202221

203222
async def _ls(self, url, detail=True, **kwargs):
204-
if self.use_listings_cache and url in self.dircache:
223+
if url in self.dircache:
205224
out = self.dircache[url]
206225
else:
207226
out = await self._ls_real(url, detail=detail, **kwargs)

0 commit comments

Comments
 (0)