Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 39 additions & 21 deletions src/cachew/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def orjson_dumps(*args: Any, **kwargs: Any) -> bytes: # type: ignore[misc]
from .backend.common import AbstractBackend
from .backend.file import FileBackend
from .backend.sqlite import SqliteBackend
from .common import DEPENDENCIES, CachewException, SourceHash
from .common import DEPENDENCIES, CacheReadError, CachewException, SourceHash
from .logging_helper import make_logger
from .marshall.cachew import CachewMarshall

Expand Down Expand Up @@ -436,10 +436,15 @@ def cached_items(self) -> Iterator[ItemT]:
f'loading {total_cached_s}objects from cachew ({self.backend_name}:{self.resolved_cache_path})'
)

for blob in self.backend.cached_blobs():
j = orjson_loads(blob)
obj = self.marshall.load(j)
yield obj
try:
for blob in self.backend.cached_blobs():
j = orjson_loads(blob)
obj = self.marshall.load(j)
yield obj
except Exception as e:
raise CacheReadError(
f'failed to read cachew cache ({self.backend_name}:{self.resolved_cache_path}); remove the cache and try again'
) from e

def write_to_cache(self, datas: Iterable[ItemT]) -> Iterator[ItemT]:
if isinstance(self.backend, FileBackend):
Expand Down Expand Up @@ -486,14 +491,8 @@ def cachew_wrapper[**P, ItemT](
**kwargs: P.kwargs,
) -> Iterator[ItemT]:
C = _cachew_context
# fmt: off
func = C.func
cls = C.cls_
logger = C.logger
synthetic_key = C.synthetic_key
# fmt: on

used_backend = C.backend
func = C.func
logger = C.logger

if not settings.ENABLE:
logger.debug('cache explicitly disabled (settings.ENABLE is False)')
Expand All @@ -505,28 +504,38 @@ def cachew_wrapper[**P, ItemT](
yield from func(*args, **kwargs)
return

try:
resolved_cache_path = C.resolve_cache_path(*args, **kwargs)
except Exception as e:
cachew_error(e, logger=logger)
yield from func(*args, **kwargs)
return

if resolved_cache_path is None:
# user explicitly requested no caching
yield from func(*args, **kwargs)
return

synthetic_key = C.synthetic_key

# NOTE: annoyingly huge try/catch ahead...
# but it lets us save a function call, hence a stack frame
# see test_recursive*
early_exit = False
running_uncached = False
try:
resolved_cache_path = C.resolve_cache_path(*args, **kwargs)
if resolved_cache_path is None:
yield from func(*args, **kwargs)
return

BackendCls = BACKENDS[used_backend]
BackendCls = BACKENDS[C.backend]

new_hash_d = C.composite_hash(*args, **kwargs)
new_hash: SourceHash = json.dumps(new_hash_d)
logger.debug(f'new hash: {new_hash}')

marshall: CachewMarshall[ItemT] = CachewMarshall(Type_=cls)
marshall: CachewMarshall[ItemT] = CachewMarshall(Type_=C.cls_)

with BackendCls(cache_path=resolved_cache_path, logger=logger) as backend:
session = CacheSession(
backend=backend,
backend_name=used_backend,
backend_name=C.backend,
resolved_cache_path=resolved_cache_path,
marshall=marshall,
new_hash=new_hash,
Expand Down Expand Up @@ -559,7 +568,9 @@ def cachew_wrapper[**P, ItemT](
# NOTE: this is the bit we really have to watch out for and not put in a helper function
# otherwise it's causing an extra stack frame on every call
# the rest (reading from cachew or writing to cachew) happens once per function call? so not a huge deal
running_uncached = True
yield from func(*args, **kwargs)
running_uncached = False
return

# at this point we're guaranteed to have an exclusive write transaction
Expand All @@ -568,7 +579,13 @@ def cachew_wrapper[**P, ItemT](
except GeneratorExit:
early_exit = True
raise
except CacheReadError:
# Cache read failures bypass THROW_ON_ERROR because fallback can duplicate already-yielded cached items.
raise
except Exception as e:
if running_uncached:
raise

# sigh... see test_early_exit_shutdown...
if early_exit and 'Cannot operate on a closed database' in str(e):
return
Expand All @@ -580,6 +597,7 @@ def cachew_wrapper[**P, ItemT](


__all__ = [
'CacheReadError',
'CachewException',
'HashFunction',
'SourceHash',
Expand Down
10 changes: 10 additions & 0 deletions src/cachew/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@


class CachewException(RuntimeError):
# TODO rename this to CachewError for consistency with concrete error subclasses.
pass


class CacheReadError(CachewException):
"""
Cache read failures are unrecoverable and do not respect settings.THROW_ON_ERROR.
Once cached data starts yielding, falling back to the wrapped function can duplicate or mix results.
"""

pass


Expand Down
52 changes: 46 additions & 6 deletions src/cachew/tests/test_cachew.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@

from .. import (
Backend,
CacheReadError,
CachewException,
cachew,
callable_name,
get_logger,
settings,
)
from ..backend.file import FileBackend
from ..backend.sqlite import SqliteBackend

logger = get_logger()

Expand Down Expand Up @@ -1071,13 +1074,12 @@ def fun() -> Iterator[BB]:
assert calls == 1


@pytest.mark.xfail(reason='cache read errors after yielding currently restart the source iterator', strict=True)
def test_defensive_read_error_after_yield_does_not_duplicate(
def test_defensive_read_error_after_yield_raises_cache_read_error(
tmp_path: Path,
restore_settings,
) -> None:
"""
If cache reading fails after yielding an item, fallback must not restart the source iterator and duplicate emitted items.
Cache read errors are unrecoverable because fallback after yielding cached items can duplicate or mix results.
"""
settings.THROW_ON_ERROR = False

Expand Down Expand Up @@ -1114,9 +1116,47 @@ def fun() -> Iterator[Item]:
yield first
yield second

# Current buggy result is [first, first, second]: one item loaded from cache, then full fallback.
# Expected result is [first, second], with no restarted source iterator after anything was yielded.
assert list(fun()) == [first, second]
# Previous buggy behavior was [first, first, second]: one item loaded from cache, then full fallback.
# Expected behavior is a hard cache read error, even when THROW_ON_ERROR is false.
with pytest.raises(CacheReadError, match='failed to read cachew cache'):
list(fun())
assert calls == 1


def test_locked_write_uncached_exception_propagates_without_retry(
tmp_path: Path,
restore_settings,
) -> None:
"""
If cachew loses the write lock and runs uncached, wrapped function errors must not trigger defensive retry.
"""
settings.THROW_ON_ERROR = False

class UserError(Exception):
pass

calls = 0
cache_path = tmp_path / 'cache'

@cachew(cache_path, force_file=True)
def fun() -> Iterator[int]:
nonlocal calls
calls += 1
yield 1
raise UserError('boom')

backend_cls = {
'file': FileBackend,
'sqlite': SqliteBackend,
}[settings.DEFAULT_BACKEND]

with backend_cls(cache_path=cache_path, logger=logger) as backend:
assert backend.get_exclusive_write()
it = iter(fun())
assert next(it) == 1
with pytest.raises(UserError, match='boom'):
next(it)

assert calls == 1


Expand Down